This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 63dbe6f  tablet: fix handling of ghost rows when merging MRSs
63dbe6f is described below

commit 63dbe6f9be3399ea83f6ffda5f6de1d84d5c8b96
Author: Andrew Wong <[email protected]>
AuthorDate: Fri Oct 23 14:01:35 2020 -0700

    tablet: fix handling of ghost rows when merging MRSs
    
    We would previously designate a nullptr to keep the memory used by an
    MRS's compaction input RowBlock during compaction. We had a memory
    buffer available, but we would previously not point the RowBlock at it
    correctly, even though the rest of the compaction code used the correct
    memory buffer.
    
    This is inconsequential today because we only try to dereference the
    memory buffer from the RowBlock when trying to merge histories of rows
    that are strewn across multiple rowsets (e.g. from deletes followed by
    inserts of the same rows). Today, we never merge multiple MRSs, we only
    merge multiple DRSs, or flush a single MRS. However, I have a patch
    coming up that will exercise this.
    
    F1023 16:38:14.955049 40525 compaction.cc:672] Check failed: arena != 
nullptr Arena can't be null
    *** Check failure stack trace: ***
    *** Aborted at 1603496294 (unix time) try "date -d @1603496294" if you are 
using GNU date ***
    PC: @     0x7f86791fc1d7 __GI_raise
    *** SIGABRT (@0x111700009e4d) received by PID 40525 (TID 0x7f86866719c0) 
from PID 40525; stack trace: ***
        @     0x7f8682718370 (unknown)
        @     0x7f86791fc1d7 __GI_raise
        @     0x7f86791fd8c8 __GI_abort
        @     0x7f867bd847b9 google::logging_fail()
        @     0x7f867bd85f8d google::LogMessage::Fail()
        @     0x7f867bd87ee3 google::LogMessage::SendToLog()
        @     0x7f867bd85ae9 google::LogMessage::Flush()
        @     0x7f867bd8886f google::LogMessageFatal::~LogMessageFatal()
        @     0x7f8685523a59 kudu::tablet::(anonymous 
namespace)::MergeCompactionInput::SetPreviousGhost()
        @     0x7f8685522b45 kudu::tablet::(anonymous 
namespace)::MergeCompactionInput::PrepareBlock()
        @     0x7f8685527c08 kudu::tablet::FlushCompactionInput()
        @           0x47f047 kudu::tablet::TestCompaction::DoFlushAndReopen()
        @           0x46a954 
kudu::tablet::TestCompaction_TestMergeMRS_Test::TestBody()
        @     0x7f86859c6b39 
testing::internal::HandleExceptionsInMethodIfSupported<>()
        @     0x7f86859b804f testing::Test::Run()
        @     0x7f86859b810d testing::TestInfo::Run()
        @     0x7f86859b8225 testing::TestCase::Run()
        @     0x7f86859b84e8 testing::internal::UnitTestImpl::RunAllTests()
        @     0x7f86859b8789 testing::UnitTest::Run()
        @     0x7f8686070cbd RUN_ALL_TESTS()
        @     0x7f868606ec91 main
        @     0x7f86791e8b35 __libc_start_main
        @           0x460df9 (unknown)
    Aborted
    
    Change-Id: I539b9f67ccb2e05cac7eccf75fce89f0359c8ca9
    Reviewed-on: http://gerrit.cloudera.org:8080/16643
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <[email protected]>
---
 src/kudu/common/rowblock.h         |  2 +-
 src/kudu/tablet/compaction-test.cc | 31 +++++++++++++++-----------
 src/kudu/tablet/compaction.cc      | 45 ++++++++++++++++++++++++--------------
 3 files changed, 48 insertions(+), 30 deletions(-)

diff --git a/src/kudu/common/rowblock.h b/src/kudu/common/rowblock.h
index 5dd5827..a6a7701 100644
--- a/src/kudu/common/rowblock.h
+++ b/src/kudu/common/rowblock.h
@@ -329,7 +329,7 @@ class RowBlock {
   RowBlockRow row(size_t idx) const;
 
   const Schema* schema() const { return schema_; }
-  Arena* arena() const { return &memory_->arena; }
+  Arena* arena() const { return &DCHECK_NOTNULL(memory_)->arena; }
 
   ColumnBlock column_block(size_t col_idx) const {
     return column_block(col_idx, nrows_);
diff --git a/src/kudu/tablet/compaction-test.cc 
b/src/kudu/tablet/compaction-test.cc
index 58d84f5..3dae68d 100644
--- a/src/kudu/tablet/compaction-test.cc
+++ b/src/kudu/tablet/compaction-test.cc
@@ -1026,9 +1026,7 @@ TEST_F(TestCompaction, TestMergeMultipleSchemas) {
   DoMerge(schemas.back(), schemas);
 }
 
-// Test MergeCompactionInput against MemRowSets. This behavior isn't currently
-// used (we never compact in-memory), but this is a regression test for a bug
-// encountered during development where the first row of each MRS got dropped.
+// Test MergeCompactionInput against MemRowSets.
 TEST_F(TestCompaction, TestMergeMRS) {
   shared_ptr<MemRowSet> mrs_a;
   ASSERT_OK(MemRowSet::Create(0, schema_, log_anchor_registry_.get(),
@@ -1040,6 +1038,14 @@ TEST_F(TestCompaction, TestMergeMRS) {
                               mem_trackers_.tablet_tracker, &mrs_b));
   InsertRows(mrs_b.get(), 10, 1);
 
+  // While we're at it, let's strew some rows' histories across both rowsets.
+  // This will create ghost rows in the compaction inputs and help validate
+  // some of the ghost-row handling applied during compaction.
+  DeleteRows(mrs_a.get(), 5, 0);
+  InsertRows(mrs_b.get(), 5, 0);
+  DeleteRows(mrs_b.get(), 5, 1);
+  InsertRows(mrs_a.get(), 5, 1);
+
   MvccSnapshot snap(mvcc_);
   vector<shared_ptr<CompactionInput> > merge_inputs;
   merge_inputs.push_back(
@@ -1047,16 +1053,15 @@ TEST_F(TestCompaction, TestMergeMRS) {
   merge_inputs.push_back(
         shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_b, &schema_, 
snap)));
   unique_ptr<CompactionInput> input(CompactionInput::Merge(merge_inputs, 
&schema_));
-
-  vector<string> out;
-  IterateInput(input.get(), &out);
-  ASSERT_EQ(out.size(), 20);
-  EXPECT_EQ(R"(RowIdxInBlock: 0; Base: (string key="hello 00000000", int32 
val=0, )"
-                "int32 nullable_val=0); Undo Mutations: [@1(DELETE)]; "
-                "Redo Mutations: [];", out[0]);
-  EXPECT_EQ(R"(RowIdxInBlock: 9; Base: (string key="hello 00000091", int32 
val=9, )"
-                "int32 nullable_val=NULL); Undo Mutations: [@20(DELETE)]; "
-                "Redo Mutations: [];", out[19]);
+  vector<shared_ptr<DiskRowSet>> result_rs;
+  DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold, 
&result_rs);
+  int total_num_rows = 0;
+  for (const auto& rs : result_rs) {
+    size_t rs_live_rows;
+    ASSERT_OK(rs->CountLiveRows(&rs_live_rows));
+    total_num_rows += rs_live_rows;
+  }
+  ASSERT_EQ(20, total_num_rows);
 }
 
 #ifdef NDEBUG
diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc
index 8b116ce..f6a8804 100644
--- a/src/kudu/tablet/compaction.cc
+++ b/src/kudu/tablet/compaction.cc
@@ -104,7 +104,7 @@ class MemRowSetCompactionInput : public CompactionInput {
   MemRowSetCompactionInput(const MemRowSet& memrowset,
                            const MvccSnapshot& snap,
                            const Schema* projection)
-    : arena_(32*1024),
+    : mem_(32*1024),
       has_more_blocks_(false) {
     RowIteratorOptions opts;
     opts.projection = projection;
@@ -129,10 +129,10 @@ class MemRowSetCompactionInput : public CompactionInput {
     // Realloc the internal block storage if we don't have enough space to
     // copy the whole leaf node's worth of data into it.
     if (PREDICT_FALSE(!row_block_ || num_in_block > row_block_->nrows())) {
-      row_block_.reset(new RowBlock(&iter_->schema(), num_in_block, nullptr));
+      row_block_.reset(new RowBlock(&iter_->schema(), num_in_block, &mem_));
     }
 
-    arena_.Reset();
+    mem_.arena.Reset();
     RowChangeListEncoder undo_encoder(&buffer_);
     int next_row_index = 0;
     for (int i = 0; i < num_in_block; ++i) {
@@ -143,7 +143,7 @@ class MemRowSetCompactionInput : public CompactionInput {
       RETURN_NOT_OK(iter_->GetCurrentRow(&input_row.row,
                                          static_cast<Arena*>(nullptr),
                                          &input_row.redo_head,
-                                         &arena_,
+                                         &mem_.arena,
                                          &insertion_timestamp));
 
       // Handle the rare case where a row was inserted and deleted in the same 
operation.
@@ -165,7 +165,7 @@ class MemRowSetCompactionInput : public CompactionInput {
 
       // Materialize MRSRow undo insert (delete)
       undo_encoder.SetToDelete();
-      input_row.undo_head = Mutation::CreateInArena(&arena_,
+      input_row.undo_head = Mutation::CreateInArena(&mem_.arena,
                                                     insertion_timestamp,
                                                     
undo_encoder.as_changelist());
       undo_encoder.Reset();
@@ -181,7 +181,7 @@ class MemRowSetCompactionInput : public CompactionInput {
     return Status::OK();
   }
 
-  Arena* PreparedBlockArena() override { return &arena_; }
+  Arena* PreparedBlockArena() override { return &mem_.arena; }
 
   Status FinishBlock() override {
     return Status::OK();
@@ -197,8 +197,8 @@ class MemRowSetCompactionInput : public CompactionInput {
 
   unique_ptr<MemRowSet::Iterator> iter_;
 
-  // Arena used to store the projected undo/redo mutations of the current 
block.
-  Arena arena_;
+  // Memory used to store the projected undo/redo mutations of the current 
block.
+  RowBlockMemory mem_;
 
   faststring buffer_;
 
@@ -502,8 +502,9 @@ class MergeCompactionInput : public CompactionInput {
                    << CompactionInputRowToString(*smallest);
           continue;
         }
-        // If we found two rows with the same key, we want to make the newer 
one point to the older
-        // one, which must be a ghost.
+
+        // If we found two rows with the same key, we want to make the newer
+        // one point to the older one, which must be a ghost.
         if (PREDICT_FALSE(row_comp == 0)) {
           DVLOG(4) << "Duplicate row.\nLeft: " << 
CompactionInputRowToString(*state->next())
                    << "\nRight: " << CompactionInputRowToString(*smallest);
@@ -523,7 +524,7 @@ class MergeCompactionInput : public CompactionInput {
           }
           // .. otherwise copy and pop the other one.
           RETURN_NOT_OK(SetPreviousGhost(smallest, state->next(), true /* 
clone */,
-                                         smallest->row.row_block()->arena()));
+                                         
DCHECK_NOTNULL(smallest)->row.row_block()->arena()));
           DVLOG(4) << "Updated left duplicate smallest: "
                    << CompactionInputRowToString(*smallest);
           states_[i]->pop_front();
@@ -658,8 +659,12 @@ class MergeCompactionInput : public CompactionInput {
     return Status::OK();
   }
 
-  // Sets the previous ghost row for a CompactionInputRow.
-  // 'must_copy' indicates whether there must be a deep copy (using 
CloneCompactionInputRow()).
+  // Merges the 'previous_ghost' histories of 'older' and 'newer' such that
+  // 'older->previous_ghost' is the head of a list of rows in increasing
+  // timestamp order (deltas get newer down the list).
+  //
+  // 'must_copy' indicates whether there must be a deep copy (using
+  // CloneCompactionInputRow()).
   Status SetPreviousGhost(CompactionInputRow* older,
                           CompactionInputRow* newer,
                           bool must_copy,
@@ -669,7 +674,7 @@ class MergeCompactionInput : public CompactionInput {
     // recent.
     if (older->previous_ghost != nullptr) {
       if (CompareDuplicatedRows(*older->previous_ghost, *newer) > 0) {
-        // 'older' was more recent.
+        // 'older->previous_ghost' was more recent.
         return SetPreviousGhost(older->previous_ghost, newer, must_copy /* 
clone */, arena);
       }
       // 'newer' was more recent.
@@ -766,8 +771,8 @@ Mutation* MergeUndoHistories(Mutation* left, Mutation* 
right) {
   return head;
 }
 
-// If 'old_row' has previous versions, this transforms prior version in undos 
and adds them
-// to 'new_undo_head'.
+// If 'old_row' has previous versions, this transforms prior version in undos
+// and adds them to 'new_undo_head'.
 Status MergeDuplicatedRowHistory(const string& tablet_id,
                                  const FsErrorManager* error_manager,
                                  CompactionInputRow* old_row,
@@ -992,6 +997,12 @@ void RemoveAncientUndos(const HistoryGcOpts& 
history_gc_opts,
   }
 }
 
+// Applies the REDOs of 'src_row' in accordance with the input snapshot,
+// returning the result in 'dst_row', and converting those REDOs to UNDOs,
+// returned via 'new_undo_head' and any remaining REDO (e.g. a delete) in
+// 'new_redo_head'.
+//
+// NOTE: input REDOs are expected to be in increasing timestamp order.
 Status ApplyMutationsAndGenerateUndos(const MvccSnapshot& snap,
                                       const CompactionInputRow& src_row,
                                       Mutation** new_undo_head,
@@ -1152,6 +1163,8 @@ Status FlushCompactionInput(const string& tablet_id,
       Mutation* new_undos_head = nullptr;
       Mutation* new_redos_head = nullptr;
 
+      // Apply all REDOs to the base row, generating UNDOs for it. This does
+      // not take into account any 'previous_ghost' members.
       RETURN_NOT_OK(ApplyMutationsAndGenerateUndos(snap,
                                                    *input_row,
                                                    &new_undos_head,

Reply via email to