This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit a4dde3cdfb4ab2d39d364c96c7ec6ac1d607a363 Author: Alexey Serbin <[email protected]> AuthorDate: Mon Sep 12 15:36:53 2022 -0700 [compaction] style guide-related updates I was debugging one compaction issue and found that compaction-related code in compaction-test.cc isn't robust enough. This patch just updates the code in compaction.{h,cc} and compaction-test.cc to conform with the project's style guidelines. A follow-up patch is to make the code in compaction-test.cc more robust. There are no functional changes in this patch. Change-Id: I4ef1981dad33a6bc2d0911630fd73c95b91e45fe Reviewed-on: http://gerrit.cloudera.org:8080/18974 Tested-by: Kudu Jenkins Reviewed-by: Yifan Zhang <[email protected]> --- src/kudu/tablet/compaction-test.cc | 85 ++++++++++++++-------------- src/kudu/tablet/compaction.cc | 110 +++++++++++++++++++------------------ src/kudu/tablet/compaction.h | 38 ++++++------- 3 files changed, 119 insertions(+), 114 deletions(-) diff --git a/src/kudu/tablet/compaction-test.cc b/src/kudu/tablet/compaction-test.cc index 654b95ce3..e33f841f2 100644 --- a/src/kudu/tablet/compaction-test.cc +++ b/src/kudu/tablet/compaction-test.cc @@ -27,6 +27,7 @@ #include <random> #include <string> #include <thread> +#include <type_traits> #include <unordered_set> #include <vector> @@ -61,7 +62,6 @@ #include "kudu/tablet/mutation.h" #include "kudu/tablet/mvcc.h" #include "kudu/tablet/rowset.h" -#include "kudu/tablet/rowset_metadata.h" #include "kudu/tablet/tablet-harness.h" #include "kudu/tablet/tablet-test-util.h" #include "kudu/tablet/tablet.h" @@ -92,23 +92,24 @@ DEFINE_int32(merge_benchmark_num_rows_per_rowset, 500000, DECLARE_string(block_manager); +using kudu::consensus::OpId; +using kudu::log::LogAnchorRegistry; using std::shared_ptr; using std::string; using std::thread; using std::unique_ptr; using std::unordered_set; using std::vector; +using strings::Substitute; namespace kudu { namespace tablet { -using consensus::OpId; -using log::LogAnchorRegistry; -using strings::Substitute; +class RowSetMetadata; -static const char *kRowKeyFormat = "hello %08d"; -static const size_t kLargeRollThreshold = 1024 * 1024 * 1024; // 1GB -static const size_t kSmallRollThreshold = 1024; // 1KB +constexpr const char* const kRowKeyFormat = "hello %08d"; +constexpr const size_t kLargeRollThreshold = 1024 * 1024 * 1024; // 1GB +constexpr const size_t kSmallRollThreshold = 1024; // 1KB class TestCompaction : public KuduRowSetTest { public: @@ -131,7 +132,7 @@ class TestCompaction : public KuduRowSetTest { // Insert n_rows rows of data. // Each row is the tuple: (string key=hello <n*10 + delta>, val=<n>) - void InsertRows(MemRowSet *mrs, int n_rows, int delta) { + void InsertRows(MemRowSet* mrs, int n_rows, int delta) { for (int32_t i = 0; i < n_rows; i++) { InsertRow(mrs, i * 10 + delta, i); } @@ -140,7 +141,7 @@ class TestCompaction : public KuduRowSetTest { // Inserts a row. // The 'nullable_val' column is set to either NULL (when val is odd) // or 'val' (when val is even). - void InsertRow(MemRowSet *mrs, int row_key, int32_t val) { + void InsertRow(MemRowSet* mrs, int row_key, int32_t val) { ScopedOp op(&mvcc_, clock_.Now()); op.StartApplying(); InsertRowInOp(mrs, op, row_key, val); @@ -179,7 +180,7 @@ class TestCompaction : public KuduRowSetTest { } } - void InsertRowInOp(MemRowSet *mrs, + void InsertRowInOp(MemRowSet* mrs, const ScopedOp& op, int row_key, int32_t val) { @@ -204,21 +205,21 @@ class TestCompaction : public KuduRowSetTest { // If 'val' is even, 'nullable_val' is set to NULL. Otherwise, set to 'val'. // Note that this is the opposite of InsertRow() above, so that the updates // flop NULL to non-NULL and vice versa. - void UpdateRows(RowSet *rowset, int n_rows, int delta, int32_t new_val) { + void UpdateRows(RowSet* rowset, int n_rows, int delta, int32_t new_val) { for (uint32_t i = 0; i < n_rows; i++) { SCOPED_TRACE(i); UpdateRow(rowset, i * 10 + delta, new_val); } } - void UpdateRow(RowSet *rowset, int row_key, int32_t new_val) { + void UpdateRow(RowSet* rowset, int row_key, int32_t new_val) { ScopedOp op(&mvcc_, clock_.Now()); op.StartApplying(); UpdateRowInOp(rowset, op, row_key, new_val); op.FinishApplying(); } - void UpdateRowInOp(RowSet *rowset, + void UpdateRowInOp(RowSet* rowset, const ScopedOp& op, int row_key, int32_t new_val) { @@ -274,7 +275,7 @@ class TestCompaction : public KuduRowSetTest { op.FinishApplying(); } - void DeleteRowInOp(RowSet *rowset, const ScopedOp& op, int row_key) { + void DeleteRowInOp(RowSet* rowset, const ScopedOp& op, int row_key) { char keybuf[256]; faststring update_buf; snprintf(keybuf, sizeof(keybuf), kRowKeyFormat, row_key); @@ -300,7 +301,7 @@ class TestCompaction : public KuduRowSetTest { // Iterate over the given compaction input, stringifying and dumping each // yielded row to *out - void IterateInput(CompactionInput *input, vector<string> *out) { + void IterateInput(CompactionInput* input, vector<string>* out) { ASSERT_OK(DebugDumpCompactionInput(input, out)); } @@ -308,8 +309,8 @@ class TestCompaction : public KuduRowSetTest { // If 'result_rowsets' is not NULL, reopens the resulting rowset(s) and appends // them to the vector. void DoFlushAndReopen( - CompactionInput *input, const Schema& projection, const MvccSnapshot &snap, - int64_t roll_threshold, vector<shared_ptr<DiskRowSet> >* result_rowsets) { + CompactionInput* input, const Schema& projection, const MvccSnapshot& snap, + int64_t roll_threshold, vector<shared_ptr<DiskRowSet>>* result_rowsets) { // Flush with a large roll threshold so we only write a single file. // This simplifies the test so we always need to reopen only a single rowset. RollingDiskRowSetWriter rsw(tablet()->metadata(), projection, @@ -321,14 +322,14 @@ class TestCompaction : public KuduRowSetTest { input, snap, HistoryGcOpts::Disabled(), &rsw)); ASSERT_OK(rsw.Finish()); - vector<shared_ptr<RowSetMetadata> > metas; + vector<shared_ptr<RowSetMetadata>> metas; rsw.GetWrittenRowSetMetadata(&metas); - for (const shared_ptr<RowSetMetadata>& meta : metas) { + for (const auto& meta : metas) { ASSERT_TRUE(meta->HasBloomDataBlockForTests()); } if (result_rowsets) { // Re-open the outputs - for (const shared_ptr<RowSetMetadata>& meta : metas) { + for (const auto& meta : metas) { shared_ptr<DiskRowSet> rs; ASSERT_OK(DiskRowSet::Open(meta, log_anchor_registry_.get(), mem_trackers_, nullptr, &rs)); @@ -338,11 +339,11 @@ class TestCompaction : public KuduRowSetTest { } static Status BuildCompactionInput(const MvccSnapshot& merge_snap, - const vector<shared_ptr<DiskRowSet> >& rowsets, + const vector<shared_ptr<DiskRowSet>>& rowsets, const Schema& projection, unique_ptr<CompactionInput>* out) { - vector<shared_ptr<CompactionInput> > merge_inputs; - for (const shared_ptr<DiskRowSet> &rs : rowsets) { + vector<shared_ptr<CompactionInput>> merge_inputs; + for (const auto& rs : rowsets) { unique_ptr<CompactionInput> input; RETURN_NOT_OK(CompactionInput::Create(*rs, &projection, merge_snap, nullptr, &input)); merge_inputs.push_back(shared_ptr<CompactionInput>(input.release())); @@ -354,9 +355,9 @@ class TestCompaction : public KuduRowSetTest { // Compacts a set of DRSs. // If 'result_rowsets' is not NULL, reopens the resulting rowset(s) and appends // them to the vector. - Status CompactAndReopen(const vector<shared_ptr<DiskRowSet> >& rowsets, + Status CompactAndReopen(const vector<shared_ptr<DiskRowSet>>& rowsets, const Schema& projection, int64_t roll_threshold, - vector<shared_ptr<DiskRowSet> >* result_rowsets) { + vector<shared_ptr<DiskRowSet>>* result_rowsets) { MvccSnapshot merge_snap(mvcc_); unique_ptr<CompactionInput> compact_input; RETURN_NOT_OK(BuildCompactionInput(merge_snap, rowsets, projection, &compact_input)); @@ -366,10 +367,10 @@ class TestCompaction : public KuduRowSetTest { } // Same as above, but sets a high roll threshold so it only produces a single output. - void CompactAndReopenNoRoll(const vector<shared_ptr<DiskRowSet> >& input_rowsets, + void CompactAndReopenNoRoll(const vector<shared_ptr<DiskRowSet>>& input_rowsets, const Schema& projection, shared_ptr<DiskRowSet>* result_rs) { - vector<shared_ptr<DiskRowSet> > result_rowsets; + vector<shared_ptr<DiskRowSet>> result_rowsets; CompactAndReopen(input_rowsets, projection, kLargeRollThreshold, &result_rowsets); ASSERT_EQ(1, result_rowsets.size()); *result_rs = result_rowsets[0]; @@ -380,9 +381,9 @@ class TestCompaction : public KuduRowSetTest { // them to the vector. void FlushMRSAndReopen(const MemRowSet& mrs, const Schema& projection, int64_t roll_threshold, - vector<shared_ptr<DiskRowSet> >* result_rowsets) { + vector<shared_ptr<DiskRowSet>>* result_rowsets) { MvccSnapshot snap(mvcc_); - vector<shared_ptr<RowSetMetadata> > rowset_metas; + vector<shared_ptr<RowSetMetadata>> rowset_metas; unique_ptr<CompactionInput> input(CompactionInput::Create(mrs, &projection, snap)); DoFlushAndReopen(input.get(), projection, snap, roll_threshold, result_rowsets); } @@ -390,7 +391,7 @@ class TestCompaction : public KuduRowSetTest { // Same as above, but sets a high roll threshold so it only produces a single output. void FlushMRSAndReopenNoRoll(const MemRowSet& mrs, const Schema& projection, shared_ptr<DiskRowSet>* result_rs) { - vector<shared_ptr<DiskRowSet> > rowsets; + vector<shared_ptr<DiskRowSet>> rowsets; FlushMRSAndReopen(mrs, projection, kLargeRollThreshold, &rowsets); ASSERT_EQ(1, rowsets.size()); *result_rs = rowsets[0]; @@ -421,11 +422,11 @@ class TestCompaction : public KuduRowSetTest { // each of the input schemas. The output rowset will // have the 'projection' schema. void DoMerge(const Schema& projection, const vector<Schema>& schemas) { - vector<shared_ptr<DiskRowSet> > rowsets; + vector<shared_ptr<DiskRowSet>> rowsets; // Create one input rowset for each of the input schemas int delta = 0; - for (const Schema& schema : schemas) { + for (const auto& schema : schemas) { // Create a memrowset with a bunch of rows and updates. shared_ptr<MemRowSet> mrs; CHECK_OK(MemRowSet::Create(delta, schema, log_anchor_registry_.get(), @@ -457,7 +458,7 @@ class TestCompaction : public KuduRowSetTest { template<bool OVERLAP_INPUTS> void DoBenchmark() { - vector<shared_ptr<DiskRowSet> > rowsets; + vector<shared_ptr<DiskRowSet>> rowsets; if (FLAGS_merge_benchmark_input_dir.empty()) { // Create inputs. @@ -494,7 +495,7 @@ class TestCompaction : public KuduRowSetTest { scoped_refptr<TabletMetadata> input_meta; ASSERT_OK(TabletMetadata::Load(&fs_manager, tablet_id, &input_meta)); - for (const shared_ptr<RowSetMetadata>& meta : input_meta->rowsets()) { + for (const auto& meta : input_meta->rowsets()) { shared_ptr<DiskRowSet> rs; CHECK_OK(DiskRowSet::Open(meta, log_anchor_registry_.get(), mem_trackers_, nullptr, &rs)); @@ -576,7 +577,7 @@ TEST_F(TestCompaction, TestFlushMRSWithRolling) { mem_trackers_.tablet_tracker, &mrs)); InsertRows(mrs.get(), 30000, 0); - vector<shared_ptr<DiskRowSet> > rowsets; + vector<shared_ptr<DiskRowSet>> rowsets; FlushMRSAndReopen(*mrs, schema_, kSmallRollThreshold, &rowsets); ASSERT_GT(rowsets.size(), 1); @@ -676,7 +677,7 @@ TEST_F(TestCompaction, TestDuplicatedGhostRowsMerging) { } shared_ptr<DiskRowSet> result; - vector<shared_ptr<DiskRowSet> > all_rss; + vector<shared_ptr<DiskRowSet>> all_rss; all_rss.push_back(rs3); all_rss.push_back(rs1); all_rss.push_back(rs2); @@ -857,7 +858,7 @@ TEST_F(TestCompaction, TestDuplicatedRowsRandomCompaction) { } vector<shared_ptr<CompactionInput>> inputs; - for (auto& row_set : row_sets) { + for (const auto& row_set : row_sets) { unique_ptr<CompactionInput> ci; CHECK_OK(row_set->NewCompactionInput(&schema_, all_snap, nullptr, &ci)); inputs.push_back(shared_ptr<CompactionInput>(ci.release())); @@ -1030,7 +1031,7 @@ TEST_F(TestCompaction, TestKUDU102) { // Catch the updates that came in after the snapshot flush was made. // Note that we are merging two MRS, it's a hack MvccSnapshot snap2(mvcc_); - vector<shared_ptr<CompactionInput> > merge_inputs; + vector<shared_ptr<CompactionInput>> merge_inputs; merge_inputs.push_back( shared_ptr<CompactionInput>(CompactionInput::Create(*mrs, &schema_, snap2))); merge_inputs.push_back( @@ -1093,7 +1094,7 @@ TEST_F(TestCompaction, TestMergeMRS) { InsertRows(mrs_a.get(), 5, 1); MvccSnapshot snap(mvcc_); - vector<shared_ptr<CompactionInput> > merge_inputs { + vector<shared_ptr<CompactionInput>> merge_inputs { shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_a, &schema_, snap)), shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_b, &schema_, snap)) }; @@ -1113,7 +1114,7 @@ TEST_F(TestCompaction, TestMergeMRSWithInvisibleRows) { mem_trackers_.tablet_tracker, &mrs_b)); InsertRows(mrs_b.get(), 10, 0); MvccSnapshot snap(mvcc_); - vector<shared_ptr<CompactionInput> > merge_inputs { + vector<shared_ptr<CompactionInput>> merge_inputs { shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_a, &schema_, snap)), shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_b, &schema_, snap)) }; @@ -1238,7 +1239,7 @@ TEST_F(TestCompaction, TestRowHistoryJumpsBetweenRowsets) { // Despite the overlapping time ranges across these inputs, the compaction // should go off without a hitch. MvccSnapshot snap(mvcc_); - vector<shared_ptr<CompactionInput> > merge_inputs { + vector<shared_ptr<CompactionInput>> merge_inputs { shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_a, &schema_, snap)), shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_b, &schema_, snap)), shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_c, &schema_, snap)), @@ -1255,7 +1256,7 @@ TEST_F(TestCompaction, TestMergeMRSWithAllInvisibleRows) { shared_ptr<MemRowSet> mrs_a = CreateInvisibleMRS(); shared_ptr<MemRowSet> mrs_b = CreateInvisibleMRS(); MvccSnapshot snap(mvcc_); - vector<shared_ptr<CompactionInput> > merge_inputs { + vector<shared_ptr<CompactionInput>> merge_inputs { shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_a, &schema_, snap)), shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_b, &schema_, snap)) }; diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc index 83a08f0e8..762c92b08 100644 --- a/src/kudu/tablet/compaction.cc +++ b/src/kudu/tablet/compaction.cc @@ -23,6 +23,7 @@ #include <memory> #include <ostream> #include <string> +#include <type_traits> #include <unordered_set> #include <vector> @@ -47,7 +48,6 @@ #include "kudu/gutil/port.h" #include "kudu/gutil/stl_util.h" #include "kudu/gutil/strings/substitute.h" -#include "kudu/tablet/cfile_set.h" #include "kudu/tablet/delta_store.h" #include "kudu/tablet/delta_tracker.h" #include "kudu/tablet/diskrowset.h" @@ -138,7 +138,7 @@ class MemRowSetCompactionInput : public CompactionInput { return has_more_blocks_; } - Status PrepareBlock(vector<CompactionInputRow> *block) override { + Status PrepareBlock(vector<CompactionInputRow>* block) override { int num_in_block = iter_->remaining_in_leaf(); block->resize(num_in_block); @@ -203,7 +203,7 @@ class MemRowSetCompactionInput : public CompactionInput { return Status::OK(); } - const Schema &schema() const override { + const Schema& schema() const override { return iter_->schema(); } @@ -234,8 +234,8 @@ class DiskRowSetCompactionInput : public CompactionInput { undo_delta_iter_(std::move(undo_delta_iter)), mem_(32 * 1024), block_(&base_iter_->schema(), kRowsPerBlock, &mem_), - redo_mutation_block_(kRowsPerBlock, static_cast<Mutation *>(nullptr)), - undo_mutation_block_(kRowsPerBlock, static_cast<Mutation *>(nullptr)) {} + redo_mutation_block_(kRowsPerBlock, static_cast<Mutation*>(nullptr)), + undo_mutation_block_(kRowsPerBlock, static_cast<Mutation*>(nullptr)) {} Status Init() override { ScanSpec spec; @@ -252,12 +252,12 @@ class DiskRowSetCompactionInput : public CompactionInput { return base_iter_->HasNext(); } - Status PrepareBlock(vector<CompactionInputRow> *block) override { + Status PrepareBlock(vector<CompactionInputRow>* block) override { RETURN_NOT_OK(base_iter_->NextBlock(&block_)); std::fill(redo_mutation_block_.begin(), redo_mutation_block_.end(), - static_cast<Mutation *>(nullptr)); + static_cast<Mutation*>(nullptr)); std::fill(undo_mutation_block_.begin(), undo_mutation_block_.end(), - static_cast<Mutation *>(nullptr)); + static_cast<Mutation*>(nullptr)); RETURN_NOT_OK(redo_delta_iter_->PrepareBatch( block_.nrows(), DeltaIterator::PREPARE_FOR_COLLECT)); RETURN_NOT_OK(redo_delta_iter_->CollectMutations(&redo_mutation_block_, block_.arena())); @@ -267,7 +267,7 @@ class DiskRowSetCompactionInput : public CompactionInput { block->resize(block_.nrows()); for (int i = 0; i < block_.nrows(); i++) { - CompactionInputRow &input_row = block->at(i); + CompactionInputRow& input_row = block->at(i); input_row.row.Reset(&block_, i); input_row.redo_head = redo_mutation_block_[i]; Mutation::ReverseMutationList(&input_row.redo_head); @@ -284,7 +284,7 @@ class DiskRowSetCompactionInput : public CompactionInput { return Status::OK(); } - const Schema &schema() const override { + const Schema& schema() const override { return base_iter_->schema(); } @@ -298,8 +298,8 @@ class DiskRowSetCompactionInput : public CompactionInput { // The current block of data which has come from the input iterator RowBlock block_; - vector<Mutation *> redo_mutation_block_; - vector<Mutation *> undo_mutation_block_; + vector<Mutation*> redo_mutation_block_; + vector<Mutation*> undo_mutation_block_; enum { kRowsPerBlock = 100 @@ -382,7 +382,9 @@ int CompareDuplicatedRows(const CompactionInputRow& left, CHECK(right_last->changelist().is_delete()); } #endif - if (redos_overlap) *redos_overlap = false; + if (redos_overlap) { + *redos_overlap = false; + } return 1; } if (right.redo_head == nullptr) { @@ -397,7 +399,9 @@ int CompareDuplicatedRows(const CompactionInputRow& left, CHECK(left_last->changelist().is_delete()); } #endif - if (redos_overlap) *redos_overlap = false; + if (redos_overlap) { + *redos_overlap = false; + } return -1; } AdvanceToLastInList(&right_last); @@ -477,7 +481,7 @@ int CompareDuplicatedRows(const CompactionInputRow& left, void CopyMutations(Mutation* from, Mutation** to, Arena* arena) { Mutation* previous = nullptr; - for (const Mutation* cur = from; cur != nullptr; cur = cur->acquire_next()) { + for (const auto* cur = from; cur != nullptr; cur = cur->acquire_next()) { Mutation* copy = Mutation::CreateInArena(arena, cur->timestamp(), cur->changelist()); @@ -558,7 +562,7 @@ class MergeCompactionInput : public CompactionInput { // row of this block is less than the first row of the other block. // In this case, we can remove the other input from the merge until // this input's current block has been exhausted. - bool Dominates(const MergeState &other, const Schema &schema) const { + bool Dominates(const MergeState& other, const Schema& schema) const { DCHECK(!empty()); DCHECK(!other.empty()); @@ -569,15 +573,15 @@ class MergeCompactionInput : public CompactionInput { vector<CompactionInputRow> pending; int pending_idx; - vector<MergeState *> dominated; + vector<MergeState*> dominated; }; public: - MergeCompactionInput(const vector<shared_ptr<CompactionInput> > &inputs, + MergeCompactionInput(const vector<shared_ptr<CompactionInput>>& inputs, const Schema* schema) : schema_(schema), num_dup_rows_(0) { - for (const shared_ptr<CompactionInput>& input : inputs) { + for (const auto& input : inputs) { unique_ptr<MergeState> state(new MergeState); state->input = input; states_.push_back(state.release()); @@ -589,7 +593,7 @@ class MergeCompactionInput : public CompactionInput { } Status Init() override { - for (MergeState *state : states_) { + for (auto* state : states_) { RETURN_NOT_OK(state->input->Init()); } @@ -601,7 +605,7 @@ class MergeCompactionInput : public CompactionInput { bool HasMoreBlocks() override { // Return true if any of the input blocks has more rows pending // or more blocks which have yet to be pulled. - for (MergeState *state : states_) { + for (auto* state : states_) { if (!state->empty() || state->input->HasMoreBlocks()) { return true; @@ -611,7 +615,7 @@ class MergeCompactionInput : public CompactionInput { return false; } - Status PrepareBlock(vector<CompactionInputRow> *block) override { + Status PrepareBlock(vector<CompactionInputRow>* block) override { CHECK(!states_.empty()); block->clear(); @@ -627,7 +631,7 @@ class MergeCompactionInput : public CompactionInput { // but some benchmarks indicated that the simpler code path of the O(n k) merge // actually ends up a bit faster. for (int i = 0; i < states_.size(); i++) { - MergeState *state = states_[i]; + MergeState* state = states_[i]; if (state->empty()) { prepared_block_arena_ = state->input->PreparedBlockArena(); @@ -710,7 +714,7 @@ class MergeCompactionInput : public CompactionInput { return ProcessEmptyInputs(); } - const Schema &schema() const override { + const Schema& schema() const override { return *schema_; } @@ -725,7 +729,7 @@ class MergeCompactionInput : public CompactionInput { Status ProcessEmptyInputs() { int j = 0; for (int i = 0; i < states_.size(); i++) { - MergeState *state = states_[i]; + MergeState* state = states_[i]; states_[j++] = state; if (!state->empty()) { @@ -757,7 +761,7 @@ class MergeCompactionInput : public CompactionInput { // all of those dominance relations and remove any that are no longer // valid. for (auto it = state->dominated.begin(); it != state->dominated.end(); ++it) { - MergeState *dominated = *it; + auto* dominated = *it; if (!state->Dominates(*dominated, *schema_)) { states_.push_back(dominated); it = state->dominated.erase(it); @@ -793,7 +797,7 @@ class MergeCompactionInput : public CompactionInput { return Status::OK(); } - bool TryInsertIntoDominanceList(MergeState *dominator, MergeState *candidate) { + bool TryInsertIntoDominanceList(MergeState* dominator, MergeState* candidate) { if (dominator->Dominates(*candidate, *schema_)) { dominator->dominated.push_back(candidate); return true; @@ -883,7 +887,7 @@ class MergeCompactionInput : public CompactionInput { } const Schema* schema_; - vector<MergeState *> states_; + vector<MergeState*> states_; Arena* prepared_block_arena_; // Vector to keep blocks that store duplicated row data. @@ -1048,9 +1052,9 @@ string CompactionInputRowToString(const CompactionInputRow& input_row) { //////////////////////////////////////////////////////////// -Status CompactionInput::Create(const DiskRowSet &rowset, +Status CompactionInput::Create(const DiskRowSet& rowset, const Schema* projection, - const MvccSnapshot &snap, + const MvccSnapshot& snap, const IOContext* io_context, unique_ptr<CompactionInput>* out) { CHECK(projection->has_column_ids()); @@ -1082,28 +1086,28 @@ Status CompactionInput::Create(const DiskRowSet &rowset, return Status::OK(); } -CompactionInput *CompactionInput::Create(const MemRowSet &memrowset, +CompactionInput* CompactionInput::Create(const MemRowSet& memrowset, const Schema* projection, - const MvccSnapshot &snap) { + const MvccSnapshot& snap) { CHECK(projection->has_column_ids()); return new MemRowSetCompactionInput(memrowset, snap, projection); } -CompactionInput *CompactionInput::Merge(const vector<shared_ptr<CompactionInput> > &inputs, +CompactionInput* CompactionInput::Merge(const vector<shared_ptr<CompactionInput>>& inputs, const Schema* schema) { CHECK(schema->has_column_ids()); return new MergeCompactionInput(inputs, schema); } -Status RowSetsInCompaction::CreateCompactionInput(const MvccSnapshot &snap, +Status RowSetsInCompaction::CreateCompactionInput(const MvccSnapshot& snap, const Schema* schema, const IOContext* io_context, - shared_ptr<CompactionInput> *out) const { + shared_ptr<CompactionInput>* out) const { CHECK(schema->has_column_ids()); - vector<shared_ptr<CompactionInput> > inputs; - for (const shared_ptr<RowSet> &rs : rowsets_) { + vector<shared_ptr<CompactionInput>> inputs; + for (const auto& rs : rowsets_) { unique_ptr<CompactionInput> input; RETURN_NOT_OK_PREPEND(rs->NewCompactionInput(schema, snap, io_context, &input), Substitute("Could not create compaction input for rowset $0", @@ -1123,7 +1127,7 @@ Status RowSetsInCompaction::CreateCompactionInput(const MvccSnapshot &snap, void RowSetsInCompaction::DumpToLog() const { VLOG(1) << "Selected " << rowsets_.size() << " rowsets to compact:"; // Dump the selected rowsets to the log, and collect corresponding iterators. - for (const shared_ptr<RowSet> &rs : rowsets_) { + for (const auto& rs : rowsets_) { VLOG(1) << rs->ToString() << "(current size on disk: ~" << rs->OnDiskSize() << " bytes)"; } @@ -1153,8 +1157,8 @@ void RemoveAncientUndos(const HistoryGcOpts& history_gc_opts, DVLOG(5) << "Ancient history mark: " << history_gc_opts.ancient_history_mark().ToString() << ": " << HybridClock::StringifyTimestamp(history_gc_opts.ancient_history_mark()); - Mutation *prev_undo = nullptr; - Mutation *undo_mut = *undo_head; + Mutation* prev_undo = nullptr; + Mutation* undo_mut = *undo_head; while (undo_mut != nullptr) { if (history_gc_opts.IsAncientHistory(undo_mut->timestamp())) { // Drop all undos following this one in the list; Their timestamps will be lower. @@ -1199,7 +1203,7 @@ Status ApplyMutationsAndGenerateUndos(const MvccSnapshot& snap, Mutation* redo_delete = nullptr; // Convert the redos into undos. - for (const Mutation *redo_mut = src_row.redo_head; + for (const auto* redo_mut = src_row.redo_head; redo_mut != nullptr; redo_mut = redo_mut->acquire_next()) { @@ -1426,11 +1430,11 @@ Status FlushCompactionInput(const string& tablet_id, } Status ReupdateMissedDeltas(const IOContext* io_context, - CompactionInput *input, + CompactionInput* input, const HistoryGcOpts& history_gc_opts, - const MvccSnapshot &snap_to_exclude, - const MvccSnapshot &snap_to_include, - const RowSetVector &output_rowsets) { + const MvccSnapshot& snap_to_exclude, + const MvccSnapshot& snap_to_include, + const RowSetVector& output_rowsets) { TRACE_EVENT0("tablet", "ReupdateMissedDeltas"); RETURN_NOT_OK(input->Init()); @@ -1438,9 +1442,9 @@ Status ReupdateMissedDeltas(const IOContext* io_context, snap_to_exclude.ToString() << " and " << snap_to_include.ToString(); // Collect the disk rowsets that we'll push the updates into. - deque<DiskRowSet *> diskrowsets; - for (const shared_ptr<RowSet> &rs : output_rowsets) { - diskrowsets.push_back(down_cast<DiskRowSet *>(rs.get())); + deque<DiskRowSet*> diskrowsets; + for (const auto& rs : output_rowsets) { + diskrowsets.push_back(down_cast<DiskRowSet*>(rs.get())); } // The set of updated delta trackers. @@ -1462,11 +1466,11 @@ Status ReupdateMissedDeltas(const IOContext* io_context, while (input->HasMoreBlocks()) { RETURN_NOT_OK(input->PrepareBlock(&rows)); - for (const CompactionInputRow &row : rows) { + for (const auto& row : rows) { DVLOG(4) << "Revisiting row: " << CompactionInputRowToString(row); bool is_garbage_collected = false; - for (const Mutation *mut = row.redo_head; + for (const auto* mut = row.redo_head; mut != nullptr; mut = mut->acquire_next()) { is_garbage_collected = false; @@ -1588,7 +1592,7 @@ Status ReupdateMissedDeltas(const IOContext* io_context, { TRACE_EVENT0("tablet", "Flushing missed deltas"); - for (DeltaTracker* tracker : updated_trackers) { + for (auto* tracker : updated_trackers) { VLOG(1) << "Flushing DeltaTracker updated with missed deltas..."; RETURN_NOT_OK_PREPEND(tracker->Flush(io_context, DeltaTracker::NO_FLUSH_METADATA), "Could not flush delta tracker after missed delta update"); @@ -1599,14 +1603,14 @@ Status ReupdateMissedDeltas(const IOContext* io_context, } -Status DebugDumpCompactionInput(CompactionInput *input, vector<string> *lines) { +Status DebugDumpCompactionInput(CompactionInput* input, vector<string>* lines) { RETURN_NOT_OK(input->Init()); vector<CompactionInputRow> rows; while (input->HasMoreBlocks()) { RETURN_NOT_OK(input->PrepareBlock(&rows)); - for (const CompactionInputRow &input_row : rows) { + for (const auto& input_row : rows) { LOG_STRING(INFO, lines) << CompactionInputRowToString(input_row); } diff --git a/src/kudu/tablet/compaction.h b/src/kudu/tablet/compaction.h index 7ffd7e36c..f6e7dae4e 100644 --- a/src/kudu/tablet/compaction.h +++ b/src/kudu/tablet/compaction.h @@ -104,25 +104,25 @@ class CompactionInput { // need to call snap.IsApplied() on each mutation. // // TODO: can we make the above less messy? - static Status Create(const DiskRowSet &rowset, + static Status Create(const DiskRowSet& rowset, const Schema* projection, - const MvccSnapshot &snap, + const MvccSnapshot& snap, const fs::IOContext* io_context, std::unique_ptr<CompactionInput>* out); // Create an input which reads from the given memrowset, yielding base rows and updates // prior to the given snapshot. - static CompactionInput *Create(const MemRowSet &memrowset, + static CompactionInput* Create(const MemRowSet& memrowset, const Schema* projection, - const MvccSnapshot &snap); + const MvccSnapshot& snap); // Create an input which merges several other compaction inputs. The inputs are merged // in key-order according to the given schema. All inputs must have matching schemas. - static CompactionInput *Merge(const std::vector<std::shared_ptr<CompactionInput> > &inputs, - const Schema *schema); + static CompactionInput* Merge(const std::vector<std::shared_ptr<CompactionInput>>& inputs, + const Schema* schema); virtual Status Init() = 0; - virtual Status PrepareBlock(std::vector<CompactionInputRow> *block) = 0; + virtual Status PrepareBlock(std::vector<CompactionInputRow>* block) = 0; // Returns the arena for this compaction input corresponding to the last // prepared block. This must be called *after* PrepareBlock() as if this @@ -132,7 +132,7 @@ class CompactionInput { virtual Status FinishBlock() = 0; virtual bool HasMoreBlocks() = 0; - virtual const Schema &schema() const = 0; + virtual const Schema& schema() const = 0; virtual ~CompactionInput() {} }; @@ -140,7 +140,7 @@ class CompactionInput { // The set of rowsets which are taking part in a given compaction. class RowSetsInCompaction { public: - void AddRowSet(const std::shared_ptr<RowSet> &rowset, + void AddRowSet(const std::shared_ptr<RowSet>& rowset, std::unique_lock<std::mutex> lock) { CHECK(lock.owns_lock()); @@ -153,15 +153,15 @@ class RowSetsInCompaction { // // 'schema' is the schema for the output of the compaction, and must remain valid // for the lifetime of the returned CompactionInput. - Status CreateCompactionInput(const MvccSnapshot &snap, + Status CreateCompactionInput(const MvccSnapshot& snap, const Schema* schema, const fs::IOContext* io_context, - std::shared_ptr<CompactionInput> *out) const; + std::shared_ptr<CompactionInput>* out) const; // Dump a log message indicating the chosen rowsets. void DumpToLog() const; - const RowSetVector &rowsets() const { return rowsets_; } + const RowSetVector& rowsets() const { return rowsets_; } size_t num_rowsets() const { return rowsets_.size(); @@ -230,9 +230,9 @@ Status ApplyMutationsAndGenerateUndos(const MvccSnapshot& snap, Status FlushCompactionInput(const std::string& tablet_id, const fs::FsErrorManager* error_manager, CompactionInput* input, - const MvccSnapshot &snap, + const MvccSnapshot& snap, const HistoryGcOpts& history_gc_opts, - RollingDiskRowSetWriter *out); + RollingDiskRowSetWriter* out); // Iterate through this compaction input, finding any mutations which came // between snap_to_exclude and snap_to_include (ie those ops that were not yet @@ -245,15 +245,15 @@ Status FlushCompactionInput(const std::string& tablet_id, // After return of this function, this CompactionInput object is "used up" and will // yield no further rows. Status ReupdateMissedDeltas(const fs::IOContext* io_context, - CompactionInput *input, + CompactionInput* input, const HistoryGcOpts& history_gc_opts, - const MvccSnapshot &snap_to_exclude, - const MvccSnapshot &snap_to_include, - const RowSetVector &output_rowsets); + const MvccSnapshot& snap_to_exclude, + const MvccSnapshot& snap_to_include, + const RowSetVector& output_rowsets); // Dump the given compaction input to 'lines' or LOG(INFO) if it is NULL. // This consumes all of the input in the compaction input. -Status DebugDumpCompactionInput(CompactionInput *input, std::vector<std::string> *lines); +Status DebugDumpCompactionInput(CompactionInput* input, std::vector<std::string>* lines); // Helper methods to print a row with full history. std::string RowToString(const RowBlockRow& row,
