This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 3bdaf50b5565bb9f8b4a0652f94102c7f272ebb1
Author: Ashwani Raina <[email protected]>
AuthorDate: Sat Oct 7 19:02:45 2023 +0530

    [compaction] Skip memory allocation for ancient undo deltas
    
    Currently, while applying REDO mutations to base row and create
    corresponding UNDO deltas for each REDO mutation, compaction code
    doesn't check whether any mutation is ancient that is anyway going
    to be ignored and removed from list of UNDO deltas in later stage
    of processing. This change checks each REDO mutation beforehand
    and doesn't allocate any memory if found ancient.
    This avoids unnecessary memory usage.
    
    Change-Id: Ibb41636a3ac063478fe181560d4ec85dadeb0ef3
    Reviewed-on: http://gerrit.cloudera.org:8080/20546
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <[email protected]>
---
 src/kudu/tablet/compaction.cc             |  67 +++++++++++---
 src/kudu/tablet/compaction.h              |   3 +-
 src/kudu/tablet/delta_compaction.cc       |   4 +-
 src/kudu/tablet/tablet-test-base.h        |   2 +-
 src/kudu/tablet/tablet_history_gc-test.cc | 146 ++++++++++++++++++++++++++++++
 5 files changed, 206 insertions(+), 16 deletions(-)

diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc
index 858d2b658..8f2bba384 100644
--- a/src/kudu/tablet/compaction.cc
+++ b/src/kudu/tablet/compaction.cc
@@ -987,7 +987,8 @@ Status MergeDuplicatedRowHistory(const string& tablet_id,
                                  const scoped_refptr<FsErrorManager>& 
error_manager,
                                  CompactionInputRow* old_row,
                                  Mutation** new_undo_head,
-                                 Arena* arena) {
+                                 Arena* arena,
+                                 const HistoryGcOpts& history_gc_opts) {
   if (PREDICT_TRUE(old_row->previous_ghost == nullptr)) return Status::OK();
 
   // Use an all inclusive snapshot as all of the previous version's undos and 
redos
@@ -1013,7 +1014,8 @@ Status MergeDuplicatedRowHistory(const string& tablet_id,
                                                  &pv_new_undos_head,
                                                  &pv_delete_redo,
                                                  arena,
-                                                 &previous_ghost->row));
+                                                 &previous_ghost->row,
+                                                 history_gc_opts));
 
     // We should be left with only one redo, the delete.
 #ifndef NDEBUG
@@ -1222,7 +1224,8 @@ Status ApplyMutationsAndGenerateUndos(const MvccSnapshot& 
snap,
                                       Mutation** new_undo_head,
                                       Mutation** new_redo_head,
                                       Arena* arena,
-                                      RowBlockRow* dst_row) {
+                                      RowBlockRow* dst_row,
+                                      const HistoryGcOpts& history_gc_opts) {
   bool is_deleted = false;
 
   #define ERROR_LOG_CONTEXT \
@@ -1236,6 +1239,10 @@ Status ApplyMutationsAndGenerateUndos(const 
MvccSnapshot& snap,
   // Const cast this away here since we're ever only going to point to it
   // which doesn't actually mutate it and having Mutation::set_next()
   // take a non-const value is required in other places.
+  // If there are any existing ancient undo mutations in the list,
+  // those will be removed in the caller i.e. FlushCompactionInput
+  // post processing of 'applying the mutations' and 'merging duplicate
+  // history'.
   Mutation* undo_head = const_cast<Mutation*>(src_row.undo_head);
   Mutation* redo_delete = nullptr;
 
@@ -1274,6 +1281,14 @@ Status ApplyMutationsAndGenerateUndos(const 
MvccSnapshot& snap,
           continue;
         }
 
+        // If the timestamp of current redo mutation is ancient, don't consider
+        // it when converting to undo delta. Any ancient redo mutation is
+        // not taken into consideration when maintaining the past undo deltas
+        // as all these ancient updates are expected to be GC'ed anyway.
+        if (history_gc_opts.IsAncientHistory(redo_mut->timestamp())) {
+          continue;
+        }
+
         // create the UNDO mutation in the provided arena.
         current_undo = Mutation::CreateInArena(arena, redo_mut->timestamp(),
                                                undo_encoder.as_changelist());
@@ -1286,6 +1301,22 @@ Status ApplyMutationsAndGenerateUndos(const 
MvccSnapshot& snap,
         redo_decoder.TwiddleDeleteStatus(&is_deleted);
         // Delete mutations are left as redos. Encode the DELETE as a redo.
         undo_encoder.SetToDelete();
+
+        // No need to check whether current redo mutation is ancient or not.
+        // There are two cases hereafter:
+        // 1. If this is the last mutation (of type DELETE) for this row and
+        //    if this mutation turns out to be ancient, it will be detected
+        //    in caller while processing ancient undo deltas. Essentially,
+        //    all the mutations for this row will be GC'ed in such a case with
+        //    just one allocation for redo_delete which is acceptable.
+        //
+        // 2. If there is at least a subsequent mutation for this row, it
+        //    would be a re-insert case as noted below. For such scenario,
+        //    redo_delete is anyway going to get deleted.
+        // As both cases are only causing one time object allocation for short
+        // duration, we can let this allocation go through without checking for
+        // ancient mark in order to avoid too many changes to current logic of
+        // handling DELETE and REINSERT mutations.
         redo_delete = Mutation::CreateInArena(arena,
                                               redo_mut->timestamp(),
                                               undo_encoder.as_changelist());
@@ -1294,7 +1325,7 @@ Status ApplyMutationsAndGenerateUndos(const MvccSnapshot& 
snap,
       // When we see a reinsert REDO we do the following:
       // 1 - Reset the REDO head, which contained a DELETE REDO.
       // 2 - Apply the REINSERT to the row, passing an undo_encoder that 
encodes the state of
-      //     the row prior to to the REINSERT.
+      //     the row prior to the REINSERT.
       // 3 - Create a mutation for the REINSERT above and add it to the UNDOs, 
this mutation
       //     will have the timestamp of the DELETE REDO.
       // 4 - Create a new delete UNDO. This mutation will have the timestamp 
of the REINSERT REDO.
@@ -1315,12 +1346,22 @@ Status ApplyMutationsAndGenerateUndos(const 
MvccSnapshot& snap,
             dst_row, static_cast<Arena*>(nullptr), &undo_encoder), ERROR,
                           "Unable to apply reinsert undo. \n" + 
ERROR_LOG_CONTEXT);
 
-        // 3 - Create a mutation for the REINSERT above and add it to the 
UNDOs.
-        current_undo = Mutation::CreateInArena(arena,
-                                               delete_ts,
-                                               undo_encoder.as_changelist());
-        SetHead(&undo_head, current_undo);
+        // Check if redo mutation of type DELETE from previous iteration is
+        // ancient. Skip creating undo object for the same if it is ancient.
+        // We still want to process redo mutation from current iteration 
though.
+        if (!history_gc_opts.IsAncientHistory(delete_ts)) {
+          // 3 - Create a mutation for the REINSERT above and add it to the 
UNDOs.
+          current_undo = Mutation::CreateInArena(arena,
+                                                 delete_ts,
+                                                 undo_encoder.as_changelist());
+          SetHead(&undo_head, current_undo);
+        }
 
+        // Check if redo mutation from current iteration is ancient. Skip
+        // creating corresponding undo object for the same if it is ancient.
+        if (history_gc_opts.IsAncientHistory(redo_mut->timestamp())) {
+          continue;
+        }
         // 4 - Create a DELETE mutation and add it to the UNDOs.
         undo_encoder.Reset();
         undo_encoder.SetToDelete();
@@ -1384,17 +1425,19 @@ Status FlushCompactionInput(const string& tablet_id,
                                                    &new_undos_head,
                                                    &new_redos_head,
                                                    input->PreparedBlockArena(),
-                                                   &dst_row));
+                                                   &dst_row,
+                                                   history_gc_opts));
 
       // Merge the histories of 'input_row' with previous ghosts, if there are 
any.
       RETURN_NOT_OK(MergeDuplicatedRowHistory(tablet_id,
                                               error_manager,
                                               input_row,
                                               &new_undos_head,
-                                              input->PreparedBlockArena()));
+                                              input->PreparedBlockArena(),
+                                              history_gc_opts));
 
       // Remove ancient UNDOS and check whether the row should be garbage 
collected.
-      bool is_garbage_collected;
+      bool is_garbage_collected = false;
       RemoveAncientUndos(history_gc_opts,
                          &new_undos_head,
                          new_redos_head,
diff --git a/src/kudu/tablet/compaction.h b/src/kudu/tablet/compaction.h
index 503667a74..8b085958c 100644
--- a/src/kudu/tablet/compaction.h
+++ b/src/kudu/tablet/compaction.h
@@ -226,7 +226,8 @@ Status ApplyMutationsAndGenerateUndos(const MvccSnapshot& 
snap,
                                       Mutation** new_undo_head,
                                       Mutation** new_redo_head,
                                       Arena* arena,
-                                      RowBlockRow* dst_row);
+                                      RowBlockRow* dst_row,
+                                      const HistoryGcOpts& history_gc_opts);
 
 // Iterate through this compaction input, flushing all rows to the given 
RollingDiskRowSetWriter.
 // The 'snap' argument should match the MvccSnapshot used to create the 
compaction input.
diff --git a/src/kudu/tablet/delta_compaction.cc 
b/src/kudu/tablet/delta_compaction.cc
index b0e579269..6a582530d 100644
--- a/src/kudu/tablet/delta_compaction.cc
+++ b/src/kudu/tablet/delta_compaction.cc
@@ -183,8 +183,8 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas(const 
IOContext* io_context) {
       bool is_garbage_collected;
 
       RETURN_NOT_OK(ApplyMutationsAndGenerateUndos(
-          snap, *input_row, &new_undos_head, &new_redos_head, &mem.arena, 
&dst_row));
-
+          snap, *input_row, &new_undos_head, &new_redos_head, &mem.arena,
+          &dst_row, history_gc_opts_));
       RemoveAncientUndos(history_gc_opts_,
                          &new_undos_head,
                          new_redos_head,
diff --git a/src/kudu/tablet/tablet-test-base.h 
b/src/kudu/tablet/tablet-test-base.h
index 2461eda19..f82692a4c 100644
--- a/src/kudu/tablet/tablet-test-base.h
+++ b/src/kudu/tablet/tablet-test-base.h
@@ -572,7 +572,7 @@ class TabletTestBase : public KuduTabletTest {
     }
     ASSERT_EQ(expected_row_count, actual_row_count)
         << "Expected row count didn't match actual row count";
-    LOG(INFO) << "Successfully verified " << expected_row_count << "rows";
+    LOG(INFO) << Substitute("Successfully verified $0 rows", 
expected_row_count);
   }
 
   // Iterate through the full table, stringifying the resulting rows
diff --git a/src/kudu/tablet/tablet_history_gc-test.cc 
b/src/kudu/tablet/tablet_history_gc-test.cc
index e8541a475..24fd834d4 100644
--- a/src/kudu/tablet/tablet_history_gc-test.cc
+++ b/src/kudu/tablet/tablet_history_gc-test.cc
@@ -250,6 +250,69 @@ TEST_F(TabletHistoryGcTest, 
TestNoGenerateUndoOnMajorDeltaCompaction) {
       R"(Redo Mutations: \[\];$)"));
 }
 
+// Test that we do not generate undos for redo operations that are older than
+// the AHM during rowset compaction.
+TEST_F(TabletHistoryGcTest, TestNoGenerateUndoOnRowSetCompaction) {
+  FLAGS_tablet_history_max_age_sec = 1; // Keep history for 1 second.
+
+  NO_FATALS(InsertOriginalRows(kNumRowsets, rows_per_rowset_));
+  NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), 
kRowsEqual0));
+  Timestamp time_after_insert = clock()->Now();
+
+  // Timestamps recorded after each round of updates.
+  Timestamp post_update_ts[2];
+
+  // Mutate all of the rows, setting val=1. Then again for val=2.
+  LocalTabletWriter writer(tablet().get(), &client_schema_);
+  for (int val = 1; val <= 2; val++) {
+    for (int row_idx = 0; row_idx < TotalNumRows(); row_idx++) {
+      ASSERT_OK(UpdateTestRow(&writer, row_idx, val));
+    }
+    // We must flush the DMS before rowset compaction can operate on these 
REDOs.
+    for (int i = 0; i < kNumRowsets; i++) {
+      ASSERT_OK(tablet()->FlushBiggestDMSForTests());
+    }
+    post_update_ts[val - 1] = clock()->Now();
+  }
+
+  // Move the AHM beyond our mutations, which are represented as REDOs.
+  NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(2)));
+
+  // At this point, only mutations present are residing in REDO deltas until
+  // next compaction cycle kicks in. Current-time reads should give us 2, but
+  // reads from the past should give us 0 or 1.
+  NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(),
+                                                   time_after_insert, 
kRowsEqual0));
+  NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(),
+                                                   post_update_ts[0], 
kRowsEqual1));
+  NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(),
+                                                   post_update_ts[1], 
kRowsEqual2));
+  NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), 
kRowsEqual2));
+
+  // Run rowset compaction.
+  ASSERT_OK(tablet()->Compact(Tablet::FORCE_COMPACT_ALL));
+
+  // RowSet compaction has applied all the REDO mutations on base data and also
+  // GC'ed all the existing UNDO deltas. So, current-time reads should give us
+  // 2, and reads from the past should also give us 2.
+  NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(),
+                                                   time_after_insert, 
kRowsEqual2));
+  NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(),
+                                                   post_update_ts[0], 
kRowsEqual2));
+  NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(),
+                                                   post_update_ts[1], 
kRowsEqual2));
+  NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), 
kRowsEqual2));
+
+
+  // Now, we should have base data = 2 with no other historical values.
+  // RowSet compaction will apply all the outstanding ancient REDO deltas and
+  // remove any ancient existing UNDOs, so we don't expect any UNDO deltas or
+  // REDO deltas at all, at this point.
+  NO_FATALS(VerifyDebugDumpRowsMatch(
+      R"(int32 val=2\); Undo Mutations: \[\]; )"
+      R"(Redo Mutations: \[\];$)"));
+}
+
 // Test that major delta compaction works when run on a subset of columns:
 // 1. Insert rows and flush to DiskRowSets.
 // 2. Mutate two columns.
@@ -475,6 +538,89 @@ TEST_F(TabletHistoryGcTest, TestGhostRowsNotRevived) {
       R"(int32 val=3\); Undo Mutations: \[\]; Redo Mutations: \[\];)"));
 }
 
+// Verify data integrity with "ghost" rows and their corresponding updates
+// across AHM.
+TEST_F(TabletHistoryGcTest, TestGhostRowsUpatesAHM) {
+  FLAGS_tablet_history_max_age_sec = 100;
+
+  LocalTabletWriter writer(tablet().get(), &client_schema_);
+
+  // Step 1: Iteratively insert, update and delete the row along with flush
+  // to create ghost rows in different rowsets.
+  for (int i = 0; i <= 1; i++) {
+    ASSERT_OK(InsertTestRow(&writer, 0, i));
+    ASSERT_OK(UpdateTestRow(&writer, 0, i + 10));
+    ASSERT_OK(DeleteTestRow(&writer, 0));
+    ASSERT_OK(tablet()->Flush());
+  }
+
+  // Step 2: Insert the row of same key with a new value.
+  // This will also end up in a different rowset with its own mutation list.
+  ASSERT_OK(InsertTestRow(&writer, 0, 2));
+  ASSERT_OK(tablet()->Flush());
+
+  // We should end up with three rowsets with presence of ghost rows with
+  // their corresponding updates.
+  // Step1: (Insert 0 -> Update 10 -> Delete)
+  // (int32 val=10); Undo Mutations: [@TS1(SET val=0), @TS0(DELETE)];
+  // Redo Mutations: [@TS2(DELETE)];
+  // (Insert 1 -> Update 11 -> Delete)
+  // (int32 val=11); Undo Mutations: [@TS4(SET val=1), @TS3(DELETE)];
+  // Redo Mutations: [@TS5(DELETE)];
+  // Step 2: Insert 2
+  // (int32 val=2); Undo Mutations: [@TS6(DELETE)]; Redo Mutations: [];
+  NO_FATALS(VerifyDebugDumpRowsMatch(
+      R"(int32 val=10\); Undo Mutations: \[@[[:digit:]]+\(SET val=0\), )"
+      R"(@[[:digit:]]+\(DELETE\)\]; Redo Mutations: 
\[@[[:digit:]]+\(DELETE\)\];)"
+      R"(|)"
+      R"(int32 val=11\); Undo Mutations: \[@[[:digit:]]+\(SET val=1\), )"
+      R"(@[[:digit:]]+\(DELETE\)\]; Redo Mutations: 
\[@[[:digit:]]+\(DELETE\)\];)"
+      R"(|)"
+      R"(int32 val=2\); Undo Mutations: \[@[[:digit:]]+\(DELETE\)\]; )"
+      R"(Redo Mutations: \[\];)"));
+
+  // Move the clock ahead to ensure all the updates until now are deemed 
ancient.
+  NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(200)));
+
+  // Step 3: Update test row with new value that will be applied to last rowset
+  // from Step 2 which has just an INSERT (i.e. a non-ghost row).
+  for (int i = 0; i <= 1; i++) {
+    ASSERT_OK(UpdateTestRow(&writer, 0, i + 20));
+    ASSERT_OK(tablet()->Flush());
+  }
+
+  // Until compaction runs, we expect to have three rowsets with same number of
+  // ghost rows and latest couple of updates (from Step 3) to non-ghost row.
+  // Step 3: Insert 2 -> Update 20 -> Update 21
+  // (int32 val=2); Undo Mutations: [@TS6(DELETE)];
+  // Redo Mutations: [@TS7(SET val=20), @TS8(SET val=21)];
+  NO_FATALS(VerifyDebugDumpRowsMatch(
+      R"(int32 val=10\); Undo Mutations: \[@[[:digit:]]+\(SET val=0\), )"
+      R"(@[[:digit:]]+\(DELETE\)\]; Redo Mutations: 
\[@[[:digit:]]+\(DELETE\)\];)"
+      R"(|)"
+      R"(int32 val=11\); Undo Mutations: \[@[[:digit:]]+\(SET val=1\), )"
+      R"(@[[:digit:]]+\(DELETE\)\]; Redo Mutations: 
\[@[[:digit:]]+\(DELETE\)\];)"
+      R"(|)"
+      R"(int32 val=2\); Undo Mutations: \[@[[:digit:]]+\(DELETE\)\]; )"
+      R"(Redo Mutations: \[@[[:digit:]]+\(SET val=20\), )"
+      R"(@[[:digit:]]+\(SET val=21\)\];)"));
+
+  // Step 4: This should result in a rowset with single row as base data.
+  ASSERT_OK(tablet()->Compact(Tablet::FORCE_COMPACT_ALL));
+
+  // Now the compaction has completed, we expect to have a rowset with no ghost
+  // rows and only the latest updates (i.e. non-ancient from Step 3) present in
+  // base data and UNDO deltas. Also, compaction applies all the REDO deltas,
+  // hence no REDO delta is expected to be present.
+  // Step 4: Insert 2 -> Update 20 -> Update 21
+  // (int32 val=21); Undo Mutations: [@TS8(SET val=20), @TS7(SET val=2)];
+  // Redo Mutations: [];
+  NO_FATALS(VerifyDebugDumpRowsMatch(
+      R"(int32 val=21\); Undo Mutations: \[@[[:digit:]]+\(SET val=20\), )"
+      R"(@[[:digit:]]+\(SET val=2\)\]; )"
+      R"(Redo Mutations: \[\];$)"));
+}
+
 // Test to ensure that nothing bad happens when we partially GC different rows
 // in a rowset. We delete alternating keys to end up with a mix of GCed and
 // non-GCed rows in each rowset.

Reply via email to