This is an automated email from the ASF dual-hosted git repository.
awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 63dbe6f tablet: fix handling of ghost rows when merging MRSs
63dbe6f is described below
commit 63dbe6f9be3399ea83f6ffda5f6de1d84d5c8b96
Author: Andrew Wong <[email protected]>
AuthorDate: Fri Oct 23 14:01:35 2020 -0700
tablet: fix handling of ghost rows when merging MRSs
We would previously designate a nullptr to keep the memory used by an
MRS's compaction input RowBlock during compaction. We had a memory
buffer available, but we would previously not point the RowBlock at it
correctly, even though the rest of the compaction code used the correct
memory buffer.
This is inconsequential today because we only try to dereference the
memory buffer from the RowBlock when trying to merge histories of rows
that are strewn across multiple rowsets (e.g. from deletes followed by
inserts of the same rows). Today, we never merge multiple MRSs, we only
merge multiple DRSs, or flush a single MRS. However, I have a patch
coming up that will exercise this.
F1023 16:38:14.955049 40525 compaction.cc:672] Check failed: arena !=
nullptr Arena can't be null
*** Check failure stack trace: ***
*** Aborted at 1603496294 (unix time) try "date -d @1603496294" if you are
using GNU date ***
PC: @ 0x7f86791fc1d7 __GI_raise
*** SIGABRT (@0x111700009e4d) received by PID 40525 (TID 0x7f86866719c0)
from PID 40525; stack trace: ***
@ 0x7f8682718370 (unknown)
@ 0x7f86791fc1d7 __GI_raise
@ 0x7f86791fd8c8 __GI_abort
@ 0x7f867bd847b9 google::logging_fail()
@ 0x7f867bd85f8d google::LogMessage::Fail()
@ 0x7f867bd87ee3 google::LogMessage::SendToLog()
@ 0x7f867bd85ae9 google::LogMessage::Flush()
@ 0x7f867bd8886f google::LogMessageFatal::~LogMessageFatal()
@ 0x7f8685523a59 kudu::tablet::(anonymous
namespace)::MergeCompactionInput::SetPreviousGhost()
@ 0x7f8685522b45 kudu::tablet::(anonymous
namespace)::MergeCompactionInput::PrepareBlock()
@ 0x7f8685527c08 kudu::tablet::FlushCompactionInput()
@ 0x47f047 kudu::tablet::TestCompaction::DoFlushAndReopen()
@ 0x46a954
kudu::tablet::TestCompaction_TestMergeMRS_Test::TestBody()
@ 0x7f86859c6b39
testing::internal::HandleExceptionsInMethodIfSupported<>()
@ 0x7f86859b804f testing::Test::Run()
@ 0x7f86859b810d testing::TestInfo::Run()
@ 0x7f86859b8225 testing::TestCase::Run()
@ 0x7f86859b84e8 testing::internal::UnitTestImpl::RunAllTests()
@ 0x7f86859b8789 testing::UnitTest::Run()
@ 0x7f8686070cbd RUN_ALL_TESTS()
@ 0x7f868606ec91 main
@ 0x7f86791e8b35 __libc_start_main
@ 0x460df9 (unknown)
Aborted
Change-Id: I539b9f67ccb2e05cac7eccf75fce89f0359c8ca9
Reviewed-on: http://gerrit.cloudera.org:8080/16643
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <[email protected]>
---
src/kudu/common/rowblock.h | 2 +-
src/kudu/tablet/compaction-test.cc | 31 +++++++++++++++-----------
src/kudu/tablet/compaction.cc | 45 ++++++++++++++++++++++++--------------
3 files changed, 48 insertions(+), 30 deletions(-)
diff --git a/src/kudu/common/rowblock.h b/src/kudu/common/rowblock.h
index 5dd5827..a6a7701 100644
--- a/src/kudu/common/rowblock.h
+++ b/src/kudu/common/rowblock.h
@@ -329,7 +329,7 @@ class RowBlock {
RowBlockRow row(size_t idx) const;
const Schema* schema() const { return schema_; }
- Arena* arena() const { return &memory_->arena; }
+ Arena* arena() const { return &DCHECK_NOTNULL(memory_)->arena; }
ColumnBlock column_block(size_t col_idx) const {
return column_block(col_idx, nrows_);
diff --git a/src/kudu/tablet/compaction-test.cc
b/src/kudu/tablet/compaction-test.cc
index 58d84f5..3dae68d 100644
--- a/src/kudu/tablet/compaction-test.cc
+++ b/src/kudu/tablet/compaction-test.cc
@@ -1026,9 +1026,7 @@ TEST_F(TestCompaction, TestMergeMultipleSchemas) {
DoMerge(schemas.back(), schemas);
}
-// Test MergeCompactionInput against MemRowSets. This behavior isn't currently
-// used (we never compact in-memory), but this is a regression test for a bug
-// encountered during development where the first row of each MRS got dropped.
+// Test MergeCompactionInput against MemRowSets.
TEST_F(TestCompaction, TestMergeMRS) {
shared_ptr<MemRowSet> mrs_a;
ASSERT_OK(MemRowSet::Create(0, schema_, log_anchor_registry_.get(),
@@ -1040,6 +1038,14 @@ TEST_F(TestCompaction, TestMergeMRS) {
mem_trackers_.tablet_tracker, &mrs_b));
InsertRows(mrs_b.get(), 10, 1);
+ // While we're at it, let's strew some rows' histories across both rowsets.
+ // This will create ghost rows in the compaction inputs and help validate
+ // some of the ghost-row handling applied during compaction.
+ DeleteRows(mrs_a.get(), 5, 0);
+ InsertRows(mrs_b.get(), 5, 0);
+ DeleteRows(mrs_b.get(), 5, 1);
+ InsertRows(mrs_a.get(), 5, 1);
+
MvccSnapshot snap(mvcc_);
vector<shared_ptr<CompactionInput> > merge_inputs;
merge_inputs.push_back(
@@ -1047,16 +1053,15 @@ TEST_F(TestCompaction, TestMergeMRS) {
merge_inputs.push_back(
shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_b, &schema_,
snap)));
unique_ptr<CompactionInput> input(CompactionInput::Merge(merge_inputs,
&schema_));
-
- vector<string> out;
- IterateInput(input.get(), &out);
- ASSERT_EQ(out.size(), 20);
- EXPECT_EQ(R"(RowIdxInBlock: 0; Base: (string key="hello 00000000", int32
val=0, )"
- "int32 nullable_val=0); Undo Mutations: [@1(DELETE)]; "
- "Redo Mutations: [];", out[0]);
- EXPECT_EQ(R"(RowIdxInBlock: 9; Base: (string key="hello 00000091", int32
val=9, )"
- "int32 nullable_val=NULL); Undo Mutations: [@20(DELETE)]; "
- "Redo Mutations: [];", out[19]);
+ vector<shared_ptr<DiskRowSet>> result_rs;
+ DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold,
&result_rs);
+ int total_num_rows = 0;
+ for (const auto& rs : result_rs) {
+ size_t rs_live_rows;
+ ASSERT_OK(rs->CountLiveRows(&rs_live_rows));
+ total_num_rows += rs_live_rows;
+ }
+ ASSERT_EQ(20, total_num_rows);
}
#ifdef NDEBUG
diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc
index 8b116ce..f6a8804 100644
--- a/src/kudu/tablet/compaction.cc
+++ b/src/kudu/tablet/compaction.cc
@@ -104,7 +104,7 @@ class MemRowSetCompactionInput : public CompactionInput {
MemRowSetCompactionInput(const MemRowSet& memrowset,
const MvccSnapshot& snap,
const Schema* projection)
- : arena_(32*1024),
+ : mem_(32*1024),
has_more_blocks_(false) {
RowIteratorOptions opts;
opts.projection = projection;
@@ -129,10 +129,10 @@ class MemRowSetCompactionInput : public CompactionInput {
// Realloc the internal block storage if we don't have enough space to
// copy the whole leaf node's worth of data into it.
if (PREDICT_FALSE(!row_block_ || num_in_block > row_block_->nrows())) {
- row_block_.reset(new RowBlock(&iter_->schema(), num_in_block, nullptr));
+ row_block_.reset(new RowBlock(&iter_->schema(), num_in_block, &mem_));
}
- arena_.Reset();
+ mem_.arena.Reset();
RowChangeListEncoder undo_encoder(&buffer_);
int next_row_index = 0;
for (int i = 0; i < num_in_block; ++i) {
@@ -143,7 +143,7 @@ class MemRowSetCompactionInput : public CompactionInput {
RETURN_NOT_OK(iter_->GetCurrentRow(&input_row.row,
static_cast<Arena*>(nullptr),
&input_row.redo_head,
- &arena_,
+ &mem_.arena,
&insertion_timestamp));
// Handle the rare case where a row was inserted and deleted in the same
operation.
@@ -165,7 +165,7 @@ class MemRowSetCompactionInput : public CompactionInput {
// Materialize MRSRow undo insert (delete)
undo_encoder.SetToDelete();
- input_row.undo_head = Mutation::CreateInArena(&arena_,
+ input_row.undo_head = Mutation::CreateInArena(&mem_.arena,
insertion_timestamp,
undo_encoder.as_changelist());
undo_encoder.Reset();
@@ -181,7 +181,7 @@ class MemRowSetCompactionInput : public CompactionInput {
return Status::OK();
}
- Arena* PreparedBlockArena() override { return &arena_; }
+ Arena* PreparedBlockArena() override { return &mem_.arena; }
Status FinishBlock() override {
return Status::OK();
@@ -197,8 +197,8 @@ class MemRowSetCompactionInput : public CompactionInput {
unique_ptr<MemRowSet::Iterator> iter_;
- // Arena used to store the projected undo/redo mutations of the current
block.
- Arena arena_;
+ // Memory used to store the projected undo/redo mutations of the current
block.
+ RowBlockMemory mem_;
faststring buffer_;
@@ -502,8 +502,9 @@ class MergeCompactionInput : public CompactionInput {
<< CompactionInputRowToString(*smallest);
continue;
}
- // If we found two rows with the same key, we want to make the newer
one point to the older
- // one, which must be a ghost.
+
+ // If we found two rows with the same key, we want to make the newer
+ // one point to the older one, which must be a ghost.
if (PREDICT_FALSE(row_comp == 0)) {
DVLOG(4) << "Duplicate row.\nLeft: " <<
CompactionInputRowToString(*state->next())
<< "\nRight: " << CompactionInputRowToString(*smallest);
@@ -523,7 +524,7 @@ class MergeCompactionInput : public CompactionInput {
}
// .. otherwise copy and pop the other one.
RETURN_NOT_OK(SetPreviousGhost(smallest, state->next(), true /*
clone */,
- smallest->row.row_block()->arena()));
+
DCHECK_NOTNULL(smallest)->row.row_block()->arena()));
DVLOG(4) << "Updated left duplicate smallest: "
<< CompactionInputRowToString(*smallest);
states_[i]->pop_front();
@@ -658,8 +659,12 @@ class MergeCompactionInput : public CompactionInput {
return Status::OK();
}
- // Sets the previous ghost row for a CompactionInputRow.
- // 'must_copy' indicates whether there must be a deep copy (using
CloneCompactionInputRow()).
+ // Merges the 'previous_ghost' histories of 'older' and 'newer' such that
+ // 'older->previous_ghost' is the head of a list of rows in increasing
+ // timestamp order (deltas get newer down the list).
+ //
+ // 'must_copy' indicates whether there must be a deep copy (using
+ // CloneCompactionInputRow()).
Status SetPreviousGhost(CompactionInputRow* older,
CompactionInputRow* newer,
bool must_copy,
@@ -669,7 +674,7 @@ class MergeCompactionInput : public CompactionInput {
// recent.
if (older->previous_ghost != nullptr) {
if (CompareDuplicatedRows(*older->previous_ghost, *newer) > 0) {
- // 'older' was more recent.
+ // 'older->previous_ghost' was more recent.
return SetPreviousGhost(older->previous_ghost, newer, must_copy /*
clone */, arena);
}
// 'newer' was more recent.
@@ -766,8 +771,8 @@ Mutation* MergeUndoHistories(Mutation* left, Mutation*
right) {
return head;
}
-// If 'old_row' has previous versions, this transforms prior version in undos
and adds them
-// to 'new_undo_head'.
+// If 'old_row' has previous versions, this transforms prior version in undos
+// and adds them to 'new_undo_head'.
Status MergeDuplicatedRowHistory(const string& tablet_id,
const FsErrorManager* error_manager,
CompactionInputRow* old_row,
@@ -992,6 +997,12 @@ void RemoveAncientUndos(const HistoryGcOpts&
history_gc_opts,
}
}
+// Applies the REDOs of 'src_row' in accordance with the input snapshot,
+// returning the result in 'dst_row', and converting those REDOs to UNDOs,
+// returned via 'new_undo_head' and any remaining REDO (e.g. a delete) in
+// 'new_redo_head'.
+//
+// NOTE: input REDOs are expected to be in increasing timestamp order.
Status ApplyMutationsAndGenerateUndos(const MvccSnapshot& snap,
const CompactionInputRow& src_row,
Mutation** new_undo_head,
@@ -1152,6 +1163,8 @@ Status FlushCompactionInput(const string& tablet_id,
Mutation* new_undos_head = nullptr;
Mutation* new_redos_head = nullptr;
+ // Apply all REDOs to the base row, generating UNDOs for it. This does
+ // not take into account any 'previous_ghost' members.
RETURN_NOT_OK(ApplyMutationsAndGenerateUndos(snap,
*input_row,
&new_undos_head,