This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 48ee1e8b5 [compaction] Code cleanup and readability improvement
48ee1e8b5 is described below
commit 48ee1e8b5d7792384178c75840a998e413aaa512
Author: Ashwani Raina <[email protected]>
AuthorDate: Tue Jan 23 17:50:11 2024 +0530
[compaction] Code cleanup and readability improvement
This is a base patch that does not change any functionality.
Goal is to break the compaction memory usage improvement
change into small ones to make it easy to review.
Change-Id: I54709b5e27751581c889854911323fbddab1c4ab
Reviewed-on: http://gerrit.cloudera.org:8080/21098
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Alexey Serbin <[email protected]>
---
src/kudu/tablet/compaction.cc | 269 ++++++++++++++++++++++--------------
src/kudu/tablet/compaction.h | 17 ++-
src/kudu/tablet/delta_compaction.cc | 16 +--
3 files changed, 178 insertions(+), 124 deletions(-)
diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc
index d56b79b78..9ad3a1576 100644
--- a/src/kudu/tablet/compaction.cc
+++ b/src/kudu/tablet/compaction.cc
@@ -990,11 +990,11 @@ Mutation* MergeUndoHistories(Mutation* left, Mutation*
right) {
// and adds them to 'new_undo_head'.
Status MergeDuplicatedRowHistory(const string& tablet_id,
const scoped_refptr<FsErrorManager>&
error_manager,
- CompactionInputRow* old_row,
- Mutation** new_undo_head,
+ const CompactionInputRow& old_row,
Arena* arena,
- const HistoryGcOpts& history_gc_opts) {
- if (PREDICT_TRUE(old_row->previous_ghost == nullptr)) return Status::OK();
+ const HistoryGcOpts& history_gc_opts,
+ Mutation** new_undo_head) {
+ if (PREDICT_TRUE(old_row.previous_ghost == nullptr)) return Status::OK();
// Use an all inclusive snapshot as all of the previous version's undos and
redos
// are guaranteed to be committed, otherwise the compaction wouldn't be able
to
@@ -1003,7 +1003,7 @@ Status MergeDuplicatedRowHistory(const string& tablet_id,
faststring dst;
- CompactionInputRow* previous_ghost = old_row->previous_ghost;
+ CompactionInputRow* previous_ghost = old_row.previous_ghost;
while (previous_ghost != nullptr) {
// First step is to transform the old rows REDO's into UNDOs, if there are
any.
@@ -1016,11 +1016,11 @@ Status MergeDuplicatedRowHistory(const string&
tablet_id,
RETURN_NOT_OK(ApplyMutationsAndGenerateUndos(all_snap,
*previous_ghost,
- &pv_new_undos_head,
- &pv_delete_redo,
arena,
+ history_gc_opts,
&previous_ghost->row,
- history_gc_opts));
+ &pv_new_undos_head,
+ &pv_delete_redo));
// We should be left with only one redo, the delete.
#ifndef NDEBUG
@@ -1179,13 +1179,11 @@ void RowSetsInCompactionOrFlush::DumpToLog() const {
}
}
-void RemoveAncientUndos(const HistoryGcOpts& history_gc_opts,
- Mutation** undo_head,
+bool RemoveAncientUndos(const HistoryGcOpts& history_gc_opts,
const Mutation* redo_head,
- bool* is_garbage_collected) {
- *is_garbage_collected = false;
+ Mutation** undo_head) {
if (!history_gc_opts.gc_enabled()) {
- return;
+ return false;
}
// Make sure there is at most one REDO in the redo_head and that, if
present, it's a DELETE.
@@ -1195,8 +1193,7 @@ void RemoveAncientUndos(const HistoryGcOpts&
history_gc_opts,
// Garbage collect rows that are deleted before the AHM.
if (history_gc_opts.IsAncientHistory(redo_head->timestamp())) {
- *is_garbage_collected = true;
- return;
+ return true;
}
}
@@ -1218,6 +1215,7 @@ void RemoveAncientUndos(const HistoryGcOpts&
history_gc_opts,
prev_undo = undo_mut;
undo_mut = undo_mut->next();
}
+ return false;
}
// Applies the REDOs of 'src_row' in accordance with the input snapshot,
@@ -1228,11 +1226,11 @@ void RemoveAncientUndos(const HistoryGcOpts&
history_gc_opts,
// NOTE: input REDOs are expected to be in increasing timestamp order.
Status ApplyMutationsAndGenerateUndos(const MvccSnapshot& snap,
const CompactionInputRow& src_row,
- Mutation** new_undo_head,
- Mutation** new_redo_head,
Arena* arena,
+ const HistoryGcOpts& history_gc_opts,
RowBlockRow* dst_row,
- const HistoryGcOpts& history_gc_opts) {
+ Mutation** new_undo_head,
+ Mutation** new_redo_head) {
bool is_deleted = false;
#define ERROR_LOG_CONTEXT \
@@ -1390,6 +1388,135 @@ Status ApplyMutationsAndGenerateUndos(const
MvccSnapshot& snap,
#undef ERROR_LOG_CONTEXT
}
+// Append REDO and UNDO deltas to DiskRowSetWriter output.
+static Status AppendDeltasToDRS(RollingDiskRowSetWriter* out,
+ Mutation* new_undos_head,
+ Mutation* new_redos_head,
+ RowBlockRow* dst_row) {
+ rowid_t index_in_current_drs;
+
+ if (new_undos_head != nullptr) {
+ // Append UNDO deltas to DiskRowSetWriter output.
+ RETURN_NOT_OK(out->AppendUndoDeltas(dst_row->row_index(),
+ new_undos_head,
+ &index_in_current_drs));
+ }
+
+ if (new_redos_head != nullptr) {
+ // Append REDO deltas to DiskRowSetWriter output.
+ RETURN_NOT_OK(out->AppendRedoDeltas(dst_row->row_index(),
+ new_redos_head,
+ &index_in_current_drs));
+ }
+
+ DVLOG(4) << "Output Row: " << dst_row->schema()->DebugRow(*dst_row)
+ << "; RowId: " << index_in_current_drs;
+
+ return Status::OK();
+}
+
+#ifndef NDEBUG
+// Sanity check for UNDO list.
+static void UndoListSanityCheck(Mutation* new_undos_head) {
+ auto* u = new_undos_head;
+ bool is_deleted = false;
+ // The resulting list should have the following invariants:
+ // - deletes can only be observed if not already deleted
+ // - reinserts can only be observed if deleted
+ // - UNDO mutations are in decreasing order
+ while (u != nullptr) {
+ if (u->changelist().is_delete()) {
+ CHECK(!is_deleted);
+ is_deleted = true;
+ } else if (u->changelist().is_reinsert()) {
+ CHECK(is_deleted);
+ is_deleted = false;
+ }
+ if (!u->next()) break;
+ CHECK_GE(u->timestamp(), u->next()->timestamp());
+ u = u->next();
+ }
+}
+#endif // NDEBUG
+
+// For each input row, go through all the REDO mutations and apply those to
base row.
+// Generate corresponding UNDO deltas for applied mutations.
+// For a row with 'ghost' entries, merge their histories of mutations.
+// Remove ancient UNDO mutations and check if row is required to be garbage
collected.
+// Append REDO and UNDO deltas to DRS output.
+// Do sanity check for final UNDO list.
+static Status ApplyMutationsAndMergeDuplicateHistory(const MvccSnapshot& snap,
+ const CompactionInputRow&
input_row,
+ size_t cur_row_idx,
+ RowBlock* block,
+ Arena* arena,
+ const string& tablet_id,
+ const
scoped_refptr<FsErrorManager>& err_mgr,
+ const HistoryGcOpts&
history_gc_opts,
+ RollingDiskRowSetWriter*
out,
+ int* live_row_count,
+ bool*
is_garbage_collected) {
+ RETURN_NOT_OK(out->RollIfNecessary());
+
+ const Schema* schema = input_row.row.schema();
+ DCHECK_SCHEMA_EQ(*schema, out->schema());
+ DCHECK(schema->has_column_ids());
+
+ DVLOG(4) << "Input Row: " << CompactionInputRowToString(input_row);
+
+ RowBlockRow dst_row = block->row(cur_row_idx);
+ RETURN_NOT_OK(CopyRow(input_row.row, &dst_row,
static_cast<Arena*>(nullptr)));
+
+ // Collect the new UNDO/REDO mutations.
+ Mutation* new_undos_head = nullptr;
+ Mutation* new_redos_head = nullptr;
+
+ // Apply all REDOs to the base row, generating UNDOs for it. This does
+ // not take into account any 'previous_ghost' members.
+ RETURN_NOT_OK(ApplyMutationsAndGenerateUndos(snap,
+ input_row,
+ arena,
+ history_gc_opts,
+ &dst_row,
+ &new_undos_head,
+ &new_redos_head));
+
+ // Merge the histories of 'input_row' with previous ghosts, if there are any.
+ RETURN_NOT_OK(MergeDuplicatedRowHistory(tablet_id,
+ err_mgr,
+ input_row,
+ arena,
+ history_gc_opts,
+ &new_undos_head));
+
+ // Remove ancient UNDOS and check whether the row should be garbage
collected.
+ *is_garbage_collected = RemoveAncientUndos(history_gc_opts,
+ new_redos_head,
+ &new_undos_head);
+
+ DVLOG(4) << "Output Row: " << RowToString(dst_row, new_redos_head,
new_undos_head) <<
+ "; Was garbage collected? " << *is_garbage_collected;
+
+ // Skip further processing if this row was garbage collected
+ if (!*is_garbage_collected) {
+ RETURN_NOT_OK(AppendDeltasToDRS(out,
+ new_undos_head,
+ new_redos_head,
+ &dst_row));
+
+ // If the REDO is empty, it should not be a DELETE.
+ if (new_redos_head == nullptr) {
+ (*live_row_count)++;
+ }
+
+#ifndef NDEBUG
+ UndoListSanityCheck(new_undos_head);
+#endif // NDEBUG
+ }
+
+ return Status::OK();
+}
+
// Following method processes the compaction input by reading input rows in
// blocks and for each row inside the block:
// - Apply all REDO mutations collected for the row at hand.
@@ -1414,110 +1541,38 @@ Status FlushCompactionInput(const string& tablet_id,
while (input->HasMoreBlocks()) {
RETURN_NOT_OK(input->PrepareBlock(&rows));
- int n = 0;
+ size_t cur_row_idx = 0;
int live_row_count = 0;
- for (int i = 0; i < rows.size(); i++) {
- CompactionInputRow* input_row = &rows[i];
- RETURN_NOT_OK(out->RollIfNecessary());
-
- const Schema* schema = input_row->row.schema();
- DCHECK_SCHEMA_EQ(*schema, out->schema());
- DCHECK(schema->has_column_ids());
-
- RowBlockRow dst_row = block.row(n);
- RETURN_NOT_OK(CopyRow(input_row->row, &dst_row,
static_cast<Arena*>(nullptr)));
-
- DVLOG(4) << "Input Row: " << CompactionInputRowToString(*input_row);
-
- // Collect the new UNDO/REDO mutations.
- Mutation* new_undos_head = nullptr;
- Mutation* new_redos_head = nullptr;
-
- // Apply all REDOs to the base row, generating UNDOs for it. This does
- // not take into account any 'previous_ghost' members.
- RETURN_NOT_OK(ApplyMutationsAndGenerateUndos(snap,
- *input_row,
- &new_undos_head,
- &new_redos_head,
- input->PreparedBlockArena(),
- &dst_row,
- history_gc_opts));
-
- // Merge the histories of 'input_row' with previous ghosts, if there are
any.
- RETURN_NOT_OK(MergeDuplicatedRowHistory(tablet_id,
- error_manager,
- input_row,
- &new_undos_head,
- input->PreparedBlockArena(),
- history_gc_opts));
-
- // Remove ancient UNDOS and check whether the row should be garbage
collected.
+ for (const auto& row : rows) {
bool is_garbage_collected = false;
- RemoveAncientUndos(history_gc_opts,
- &new_undos_head,
- new_redos_head,
- &is_garbage_collected);
-
- DVLOG(4) << "Output Row: " << RowToString(dst_row, new_redos_head,
new_undos_head) <<
- "; Was garbage collected? " << is_garbage_collected;
+ RETURN_NOT_OK(ApplyMutationsAndMergeDuplicateHistory(snap,
+ row,
+ cur_row_idx,
+ &block,
+
input->PreparedBlockArena(),
+ tablet_id,
+ error_manager,
+ history_gc_opts,
+ out,
+ &live_row_count,
+
&is_garbage_collected));
// Whether this row was garbage collected
if (is_garbage_collected) {
// Don't flush the row.
continue;
}
- rowid_t index_in_current_drs;
-
- if (new_undos_head != nullptr) {
- // Append UNDO deltas to DiskRowSetWriter output.
- out->AppendUndoDeltas(dst_row.row_index(), new_undos_head,
&index_in_current_drs);
- }
-
- if (new_redos_head != nullptr) {
- // Append REDO deltas to DiskRowSetWriter output.
- out->AppendRedoDeltas(dst_row.row_index(), new_redos_head,
&index_in_current_drs);
- }
-
- // If the REDO is empty, it should not be a DELETE.
- if (new_redos_head == nullptr) {
- live_row_count++;
- }
-
- DVLOG(4) << "Output Row: " << dst_row.schema()->DebugRow(dst_row)
- << "; RowId: " << index_in_current_drs;
-#ifndef NDEBUG
- auto* u = new_undos_head;
- bool is_deleted = false;
- // The resulting list should have the following invariants:
- // - deletes can only be observed if not already deleted
- // - reinserts can only be observed if deleted
- // - UNDO mutations are in decreasing order
- while (u != nullptr) {
- if (u->changelist().is_delete()) {
- CHECK(!is_deleted);
- is_deleted = true;
- } else if (u->changelist().is_reinsert()) {
- CHECK(is_deleted);
- is_deleted = false;
- }
- if (!u->next()) break;
- CHECK_GE(u->timestamp(), u->next()->timestamp());
- u = u->next();
- }
-#endif // NDEBUG
-
- n++;
- if (n == block.nrows()) {
+ if (++cur_row_idx == block.nrows()) {
// Append fully processed rowblock to DRS writer output.
RETURN_NOT_OK(out->AppendBlock(block, live_row_count));
live_row_count = 0;
- n = 0;
+ cur_row_idx = 0;
}
}
- if (n > 0) {
- block.Resize(n);
+ if (cur_row_idx > 0) {
+ block.Resize(cur_row_idx);
// Append partially (resized) processed rowblock to DRS writer output.
RETURN_NOT_OK(out->AppendBlock(block, live_row_count));
block.Resize(block.row_capacity());
diff --git a/src/kudu/tablet/compaction.h b/src/kudu/tablet/compaction.h
index 0404e06ca..67c42587c 100644
--- a/src/kudu/tablet/compaction.h
+++ b/src/kudu/tablet/compaction.h
@@ -203,16 +203,15 @@ struct CompactionInputRow {
// Function shared by flushes and compactions. Removes UNDO Mutations
// considered "ancient" from the given CompactionInputRow, modifying the undo
// mutation list in-place.
-// 'is_garbage_collected': Set to true if the row was marked as deleted prior
-// to the ancient history mark, with no reinsertions after that. In such a
-// case, all traces of the row should be removed from disk by the caller.
+// Return true if the row was marked as deleted prior to the ancient history
mark,
+// with no reinsertions after that. In such a case, all traces of the row
should
+// be removed from disk by the caller.
//
// This is supposed to be called after ApplyMutationsAndGenerateUndos() where
REDOS
// are transformed in UNDOs. There can be at most one REDO in 'redo_head', a
DELETE.
-void RemoveAncientUndos(const HistoryGcOpts& history_gc_opts,
- Mutation** undo_head,
+bool RemoveAncientUndos(const HistoryGcOpts& history_gc_opts,
const Mutation* redo_head,
- bool* is_garbage_collected);
+ Mutation** undo_head);
// Function shared by flushes, compactions and major delta compactions.
Applies all the REDO
// mutations from 'src_row' to the 'dst_row', and generates the related UNDO
mutations. Some
@@ -224,11 +223,11 @@ void RemoveAncientUndos(const HistoryGcOpts&
history_gc_opts,
// ignored.
Status ApplyMutationsAndGenerateUndos(const MvccSnapshot& snap,
const CompactionInputRow& src_row,
- Mutation** new_undo_head,
- Mutation** new_redo_head,
Arena* arena,
+ const HistoryGcOpts& history_gc_opts,
RowBlockRow* dst_row,
- const HistoryGcOpts& history_gc_opts);
+ Mutation** new_undo_head,
+ Mutation** new_redo_head);
// Iterate through this compaction input, flushing all rows to the given
RollingDiskRowSetWriter.
// The 'snap' argument should match the MvccSnapshot used to create the
compaction input.
diff --git a/src/kudu/tablet/delta_compaction.cc
b/src/kudu/tablet/delta_compaction.cc
index 6a582530d..dc612ef1c 100644
--- a/src/kudu/tablet/delta_compaction.cc
+++ b/src/kudu/tablet/delta_compaction.cc
@@ -179,16 +179,16 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas(const
IOContext* io_context) {
DVLOG(3) << "MDC Input Row - RowId: " << row_id << " "
<< CompactionInputRowToString(*input_row);
- // NOTE: This is presently ignored.
- bool is_garbage_collected;
-
- RETURN_NOT_OK(ApplyMutationsAndGenerateUndos(
- snap, *input_row, &new_undos_head, &new_redos_head, &mem.arena,
- &dst_row, history_gc_opts_));
+ RETURN_NOT_OK(ApplyMutationsAndGenerateUndos(snap,
+ *input_row,
+ &mem.arena,
+ history_gc_opts_,
+ &dst_row,
+ &new_undos_head,
+ &new_redos_head));
RemoveAncientUndos(history_gc_opts_,
- &new_undos_head,
new_redos_head,
- &is_garbage_collected);
+ &new_undos_head);
DVLOG(3) << "MDC Output Row - RowId: " << row_id << " "
<< RowToString(dst_row, new_undos_head, new_redos_head);