Repository: kudu Updated Branches: refs/heads/master 8ff4d1211 -> eb4b46b80
Don't output unobservable rows from the MemRowset In some rare cases we might have a row that is inserted and deleted in the same operation, meaning the insert and delete have the same timestamp. In this rare case we might run into trouble later on with compactions as there is a sequence of operations that that produce two ghost rows which are uncomparable in terms of recency. The sequence is as follows: Insert Row at 1 -> Row goes in the MRS Flush -> Row now lives in RS1 Delete Row at 3 -> Row is now a ghost in RS1 Insert Row at 3 -> Row goes in the MRS again Delete Row at 3 -> Row is deleted from the MRS Flush -> Row is now a ghost in RS2 Major delta compact RS1 -> GC all before 3 Major delta compact RS2 -> GC all before 3 If RS1 and RS2 now get compacted together there is no way to distinguish Ghost 1 from Ghost 2 and to build a correct history for the row. The point is that we can't trust UNDOs to break ties, in terms of recency, between two ghosts as they might have been removed by delta compactions. However we can always trust REDO delete/reinsert to remain. This adds a small test to compaction-test that makes sure that a row that is inserted and deleted in the same transaction doesn't appear in the compaction input. FuzzTest and TestRandomAccess would fail for the following patch (KUDU-237 (part 2)) without this change. Change-Id: Ieab52a1aa68494218f91f3acd31ef8ddf352bd57 Reviewed-on: http://gerrit.cloudera.org:8080/4994 Tested-by: Kudu Jenkins Reviewed-by: David Ribeiro Alves <[email protected]> Reviewed-by: Mike Percy <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/ecff49af Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/ecff49af Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/ecff49af Branch: refs/heads/master Commit: ecff49afb95de2cea37987797fe0c115920d2a41 Parents: 8ff4d12 Author: David Alves <[email protected]> Authored: Mon Nov 7 23:00:59 2016 -0800 Committer: David Ribeiro Alves <[email protected]> Committed: Wed Nov 30 10:14:19 2016 +0000 ---------------------------------------------------------------------- src/kudu/tablet/compaction-test.cc | 208 +++++++++++++++++++++++--------- src/kudu/tablet/compaction.cc | 39 +++++- 2 files changed, 185 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/ecff49af/src/kudu/tablet/compaction-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/compaction-test.cc b/src/kudu/tablet/compaction-test.cc index 5b8d160..3d23c29 100644 --- a/src/kudu/tablet/compaction-test.cc +++ b/src/kudu/tablet/compaction-test.cc @@ -97,6 +97,11 @@ class TestCompaction : public KuduRowSetTest { void InsertRow(MemRowSet *mrs, int row_key, int32_t val) { ScopedTransaction tx(&mvcc_, clock_->Now()); tx.StartApplying(); + InsertRowInTransaction(mrs, tx, row_key, val); + tx.Commit(); + } + + void BuildRow(int row_key, int32_t val) { row_builder_.Reset(); snprintf(key_buf_, sizeof(key_buf_), kRowKeyFormat, row_key); row_builder_.AddString(Slice(key_buf_)); @@ -106,6 +111,13 @@ class TestCompaction : public KuduRowSetTest { } else { row_builder_.AddNull(); } + } + + void InsertRowInTransaction(MemRowSet *mrs, + const ScopedTransaction& txn, + int row_key, + int32_t val) { + BuildRow(row_key, val); if (!mrs->schema().Equals(row_builder_.schema())) { // The MemRowSet is not projecting the row, so must be done by the caller RowProjector projector(&row_builder_.schema(), &mrs->schema()); @@ -113,12 +125,11 @@ class TestCompaction : public KuduRowSetTest { ContiguousRow dst_row(&mrs->schema(), rowbuf); ASSERT_OK_FAST(projector.Init()); ASSERT_OK_FAST(projector.ProjectRowForWrite(row_builder_.row(), - &dst_row, static_cast<Arena*>(nullptr))); - ASSERT_OK_FAST(mrs->Insert(tx.timestamp(), ConstContiguousRow(dst_row), op_id_)); + &dst_row, static_cast<Arena*>(nullptr))); + ASSERT_OK_FAST(mrs->Insert(txn.timestamp(), ConstContiguousRow(dst_row), op_id_)); } else { - ASSERT_OK_FAST(mrs->Insert(tx.timestamp(), row_builder_.row(), op_id_)); + ASSERT_OK_FAST(mrs->Insert(txn.timestamp(), row_builder_.row(), op_id_)); } - tx.Commit(); } // Update n_rows rows of data. @@ -128,70 +139,90 @@ class TestCompaction : public KuduRowSetTest { // Note that this is the opposite of InsertRow() above, so that the updates // flop NULL to non-NULL and vice versa. void UpdateRows(RowSet *rowset, int n_rows, int delta, int32_t new_val) { - char keybuf[256]; - faststring update_buf; - ColumnId col_id = schema_.column_id(schema_.find_column("val")); - ColumnId nullable_col_id = schema_.column_id(schema_.find_column("nullable_val")); for (uint32_t i = 0; i < n_rows; i++) { SCOPED_TRACE(i); - ScopedTransaction tx(&mvcc_, clock_->Now()); - tx.StartApplying(); - snprintf(keybuf, sizeof(keybuf), kRowKeyFormat, i * 10 + delta); - - update_buf.clear(); - RowChangeListEncoder update(&update_buf); - update.AddColumnUpdate(schema_.column_by_id(col_id), col_id, &new_val); - if (new_val % 2 == 0) { - update.AddColumnUpdate(schema_.column_by_id(nullable_col_id), - nullable_col_id, nullptr); - } else { - update.AddColumnUpdate(schema_.column_by_id(nullable_col_id), - nullable_col_id, &new_val); - } - - RowBuilder rb(schema_.CreateKeyProjection()); - rb.AddString(Slice(keybuf)); - RowSetKeyProbe probe(rb.row()); - ProbeStats stats; - OperationResultPB result; - ASSERT_OK(rowset->MutateRow(tx.timestamp(), - probe, - RowChangeList(update_buf), - op_id_, - &stats, - &result)); - tx.Commit(); + UpdateRow(rowset, i * 10 + delta, new_val); } } - void DeleteRows(RowSet *rowset, int n_rows, int delta) { + void UpdateRow(RowSet *rowset, int row_key, int32_t new_val) { + ScopedTransaction tx(&mvcc_, clock_->Now()); + tx.StartApplying(); + UpdateRowInTransaction(rowset, tx, row_key, new_val); + tx.Commit(); + } + + void UpdateRowInTransaction(RowSet *rowset, + const ScopedTransaction& txn, + int row_key, + int32_t new_val) { + ColumnId col_id = schema_.column_id(schema_.find_column("val")); + ColumnId nullable_col_id = schema_.column_id(schema_.find_column("nullable_val")); + char keybuf[256]; faststring update_buf; + snprintf(keybuf, sizeof(keybuf), kRowKeyFormat, row_key); + + update_buf.clear(); + RowChangeListEncoder update(&update_buf); + update.AddColumnUpdate(schema_.column_by_id(col_id), col_id, &new_val); + if (new_val % 2 == 0) { + update.AddColumnUpdate(schema_.column_by_id(nullable_col_id), + nullable_col_id, nullptr); + } else { + update.AddColumnUpdate(schema_.column_by_id(nullable_col_id), + nullable_col_id, &new_val); + } + + RowBuilder rb(schema_.CreateKeyProjection()); + rb.AddString(Slice(keybuf)); + RowSetKeyProbe probe(rb.row()); + ProbeStats stats; + OperationResultPB result; + ASSERT_OK(rowset->MutateRow(txn.timestamp(), + probe, + RowChangeList(update_buf), + op_id_, + &stats, + &result)); + } + + void DeleteRows(RowSet* rowset, int n_rows) { + faststring update_buf; for (uint32_t i = 0; i < n_rows; i++) { SCOPED_TRACE(i); - ScopedTransaction tx(&mvcc_, clock_->Now()); - tx.StartApplying(); - snprintf(keybuf, sizeof(keybuf), kRowKeyFormat, i * 10 + delta); - - update_buf.clear(); - RowChangeListEncoder update(&update_buf); - update.SetToDelete(); - - RowBuilder rb(schema_.CreateKeyProjection()); - rb.AddString(Slice(keybuf)); - RowSetKeyProbe probe(rb.row()); - ProbeStats stats; - OperationResultPB result; - ASSERT_OK(rowset->MutateRow(tx.timestamp(), - probe, - RowChangeList(update_buf), - op_id_, - &stats, - &result)); - tx.Commit(); + DeleteRow(rowset, i * 10); } } + void DeleteRow(RowSet* rowset, int row_key) { + ScopedTransaction tx(&mvcc_, clock_->Now()); + tx.StartApplying(); + DeleteRowInTransaction(rowset, tx, row_key); + tx.Commit(); + } + + void DeleteRowInTransaction(RowSet *rowset, const ScopedTransaction& txn, int row_key) { + char keybuf[256]; + faststring update_buf; + snprintf(keybuf, sizeof(keybuf), kRowKeyFormat, row_key); + update_buf.clear(); + RowChangeListEncoder update(&update_buf); + update.SetToDelete(); + + RowBuilder rb(schema_.CreateKeyProjection()); + rb.AddString(Slice(keybuf)); + RowSetKeyProbe probe(rb.row()); + ProbeStats stats; + OperationResultPB result; + ASSERT_OK(rowset->MutateRow(txn.timestamp(), + probe, + RowChangeList(update_buf), + op_id_, + &stats, + &result)); + } + // Iterate over the given compaction input, stringifying and dumping each // yielded row to *out void IterateInput(CompactionInput *input, vector<string> *out) { @@ -512,7 +543,7 @@ TEST_F(TestCompaction, TestDuplicatedGhostRowsDontSurviveCompaction) { } // Now delete the rows, this will make the rs report them as deleted and // so we would reinsert them into the MRS. - DeleteRows(rs1.get(), 10, 0); + DeleteRows(rs1.get(), 10); shared_ptr<DiskRowSet> rs2; { @@ -524,7 +555,7 @@ TEST_F(TestCompaction, TestDuplicatedGhostRowsDontSurviveCompaction) { FlushMRSAndReopenNoRoll(*mrs, schema_, &rs2); ASSERT_NO_FATAL_FAILURE(); } - DeleteRows(rs2.get(), 10, 0); + DeleteRows(rs2.get(), 10); shared_ptr<DiskRowSet> rs3; { @@ -566,6 +597,67 @@ TEST_F(TestCompaction, TestDuplicatedGhostRowsDontSurviveCompaction) { "@60(DELETE)]; Redo Mutations: [];", out[9]); } +// Test case that inserts and deletes a row in the same transaction and makes sure +// the row isn't on the compaction input. +TEST_F(TestCompaction, TestMRSCompactionDoesntOutputUnobservableRows) { + // Insert a row in the mrs and flush it. + shared_ptr<DiskRowSet> rs1; + { + shared_ptr<MemRowSet> mrs; + ASSERT_OK(MemRowSet::Create(0, schema_, log_anchor_registry_.get(), + mem_trackers_.tablet_tracker, &mrs)); + InsertRow(mrs.get(), 1, 1); + FlushMRSAndReopenNoRoll(*mrs, schema_, &rs1); + ASSERT_NO_FATAL_FAILURE(); + } + + // Now make the row a ghost in rs1 in the same transaction as we reinsert it in the mrs then + // flush it. Also insert another row so that the row set isn't completely empty (otherwise + // it would disappear on flush). + shared_ptr<DiskRowSet> rs2; + { + shared_ptr<MemRowSet> mrs; + ASSERT_OK(MemRowSet::Create(1, schema_, log_anchor_registry_.get(), + mem_trackers_.tablet_tracker, &mrs)); + ScopedTransaction tx(&mvcc_, clock_->Now()); + tx.StartApplying(); + DeleteRowInTransaction(rs1.get(), tx, 1); + InsertRowInTransaction(mrs.get(), tx, 1, 2); + UpdateRowInTransaction(mrs.get(), tx, 1, 3); + DeleteRowInTransaction(mrs.get(), tx, 1); + + InsertRowInTransaction(mrs.get(), tx, 2, 0); + tx.Commit(); + FlushMRSAndReopenNoRoll(*mrs, schema_, &rs2); + ASSERT_NO_FATAL_FAILURE(); + } + + MvccSnapshot all_snap = MvccSnapshot::CreateSnapshotIncludingAllTransactions(); + + gscoped_ptr<CompactionInput> rs1_input; + ASSERT_OK(CompactionInput::Create(*rs1, &schema_, all_snap, &rs1_input)); + + gscoped_ptr<CompactionInput> rs2_input; + ASSERT_OK(CompactionInput::Create(*rs2, &schema_, all_snap, &rs2_input)); + + vector<shared_ptr<CompactionInput>> to_merge; + to_merge.push_back(shared_ptr<CompactionInput>(rs1_input.release())); + to_merge.push_back(shared_ptr<CompactionInput>(rs2_input.release())); + + gscoped_ptr<CompactionInput> merged(CompactionInput::Merge(to_merge, &schema_)); + + // Make sure the unobservable version of the row that was inserted and deleted in the MRS + // in the same transaction doesn't show up in the compaction input. + vector<string> out; + IterateInput(merged.get(), &out); + EXPECT_EQ(out.size(), 2); + EXPECT_EQ("RowIdxInBlock: 0; Base: (string key=hello 00000001, int32 val=1, " + "int32 nullable_val=NULL); Undo Mutations: [@1(DELETE)]; Redo Mutations: " + "[@2(DELETE)];", out[0]); + EXPECT_EQ("RowIdxInBlock: 0; Base: (string key=hello 00000002, int32 val=0, " + "int32 nullable_val=0); Undo Mutations: [@2(DELETE)]; Redo Mutations: [];", out[1]); +} + // Test case which doesn't do any merging -- just compacts // a single input rowset (which may be the memrowset) into a single // output rowset (on disk). http://git-wip-us.apache.org/repos/asf/kudu/blob/ecff49af/src/kudu/tablet/compaction.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc index 826d5b7..bec6615 100644 --- a/src/kudu/tablet/compaction.cc +++ b/src/kudu/tablet/compaction.cc @@ -50,6 +50,15 @@ namespace tablet { namespace { +// Advances to the last mutation in a mutation list. +void AdvanceToLastInList(const Mutation** m) { + if (*m == nullptr) return; + const Mutation* next; + while ((next = (*m)->acquire_next()) != nullptr) { + *m = next; + } +} + // CompactionInput yielding rows and mutations from a MemRowSet. class MemRowSetCompactionInput : public CompactionInput { public: @@ -83,10 +92,11 @@ class MemRowSetCompactionInput : public CompactionInput { arena_.Reset(); RowChangeListEncoder undo_encoder(&buffer_); - for (int i = 0; i < num_in_block; i++) { - // TODO: A copy is performed to make all CompactionInputRow have the same schema - CompactionInputRow &input_row = block->at(i); - input_row.row.Reset(row_block_.get(), i); + int next_row_index = 0; + for (int i = 0; i < num_in_block; ++i) { + // TODO(todd): A copy is performed to make all CompactionInputRow have the same schema + CompactionInputRow& input_row = block->at(next_row_index); + input_row.row.Reset(row_block_.get(), next_row_index); Timestamp insertion_timestamp; RETURN_NOT_OK(iter_->GetCurrentRow(&input_row.row, static_cast<Arena*>(nullptr), @@ -94,15 +104,36 @@ class MemRowSetCompactionInput : public CompactionInput { &arena_, &insertion_timestamp)); + // Handle the rare case where a row was inserted and deleted in the same operation. + // This row can never be observed and should not be compacted/flushed. This saves + // us some trouble later on on compactions. + // See: MergeCompactionInput::CompareAndMergeDuplicatedRows(). + if (PREDICT_FALSE(input_row.redo_head != nullptr && + input_row.redo_head->timestamp() == insertion_timestamp)) { + // Get the latest mutation. + const Mutation* latest = input_row.redo_head; + AdvanceToLastInList(&latest); + if (latest->changelist().is_delete() && + latest->timestamp() == insertion_timestamp) { + iter_->Next(); + continue; + } + } + // Materialize MRSRow undo insert (delete) undo_encoder.SetToDelete(); input_row.undo_head = Mutation::CreateInArena(&arena_, insertion_timestamp, undo_encoder.as_changelist()); undo_encoder.Reset(); + ++next_row_index; iter_->Next(); } + if (PREDICT_FALSE(next_row_index < num_in_block)) { + block->resize(next_row_index); + } + has_more_blocks_ = iter_->HasNext(); return Status::OK(); }
