This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 89e2715fa [compaction] Add tests to generate high memory rowset 
compaction
89e2715fa is described below

commit 89e2715faf96afe0b67482166fda9c8699e8052f
Author: Ashwani Raina <[email protected]>
AuthorDate: Tue Dec 19 17:32:31 2023 +0530

    [compaction] Add tests to generate high memory rowset compaction
    
    This change implements a better approach to test out
    high memory compaction case. There is an existing patch:
    https://gerrit.cloudera.org/#/c/19278/
    that uses different approach which does not scale
    well because of excessive memory usage by client.
    
    Patch contains two tests:
    1. TestRowSetCompactionProceedWithNoBudgetingConstraints
    (generates deltas with memory requirements under budget)
    2. TestRowSetCompactionSkipWithBudgetingConstraints
    (generates deltas with memory requirements over budget)
    
    and a helper function:
    GenHighMemConsumptionDeltas
    Using this helper function, callers can generate deltas of different
    sizes as per test needs. The size_factor can be used to achieve
    that. Increasing size_factor by 1 adds 1 MB worth of deltas as
    far as rowset compaction memory consumption is concerned.
    
    Change-Id: I1996558e71c49314c6acf12faf854c796548318c
    Reviewed-on: http://gerrit.cloudera.org:8080/20816
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <[email protected]>
---
 src/kudu/tablet/compaction-test.cc | 141 +++++++++++++++++++++++++++++++++++++
 1 file changed, 141 insertions(+)

diff --git a/src/kudu/tablet/compaction-test.cc 
b/src/kudu/tablet/compaction-test.cc
index 7abc0b4ca..1d9abfe1b 100644
--- a/src/kudu/tablet/compaction-test.cc
+++ b/src/kudu/tablet/compaction-test.cc
@@ -41,6 +41,7 @@
 #include "kudu/common/partial_row.h"
 #include "kudu/common/row.h"
 #include "kudu/common/row_changelist.h"
+#include "kudu/common/row_operations.pb.h"
 #include "kudu/common/rowblock.h"
 #include "kudu/common/rowblock_memory.h"
 #include "kudu/common/rowid.h"
@@ -56,6 +57,7 @@
 #include "kudu/gutil/casts.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/diskrowset.h"
 #include "kudu/tablet/local_tablet_writer.h"
@@ -71,6 +73,7 @@
 #include "kudu/tablet/tablet_metadata.h"
 #include "kudu/util/env.h"
 #include "kudu/util/faststring.h"
+#include "kudu/util/logging_test_util.h"
 #include "kudu/util/memory/arena.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/random.h"
@@ -79,6 +82,7 @@
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
+#include "kudu/util/trace.h"
 
 DEFINE_string(merge_benchmark_input_dir, "",
               "Directory to benchmark merge. The benchmark will merge "
@@ -91,7 +95,11 @@ DEFINE_uint32(merge_benchmark_num_rowsets, 3,
 DEFINE_uint32(merge_benchmark_num_rows_per_rowset, 500000,
               "Number of rowsets as input to the merge");
 
+DECLARE_bool(rowset_compaction_memory_estimate_enabled);
+DECLARE_bool(rowset_compaction_ancient_delta_threshold_enabled);
+DECLARE_double(rowset_compaction_delta_memory_factor);
 DECLARE_string(block_manager);
+DECLARE_uint32(rowset_compaction_estimate_min_deltas_size_mb);
 
 using kudu::consensus::OpId;
 using kudu::log::LogAnchorRegistry;
@@ -131,6 +139,50 @@ class TestCompaction : public KuduRowSetTest {
     return builder.BuildWithoutIds();
   }
 
+  Status InsertOrUpsertTestRows(RowOperationsPB::Type type,
+                                int64_t first_row,
+                                int64_t count,
+                                int32_t val) {
+    LocalTabletWriter writer(tablet().get(), &client_schema());
+    KuduPartialRow row(&client_schema());
+
+    for (int64_t i = first_row; i < first_row + count; i++) {
+      RETURN_NOT_OK(row.SetStringCopy("key", Substitute("hello $0", i)));
+      RETURN_NOT_OK(row.SetInt64("val", val));
+      if (type == RowOperationsPB::INSERT) {
+        RETURN_NOT_OK(writer.Insert(row));
+      } else if (type == RowOperationsPB::UPSERT) {
+        RETURN_NOT_OK(writer.Upsert(row));
+      } else {
+        return Status::InvalidArgument(
+            Substitute("unknown row operation type: $0", type));
+      }
+    }
+    return Status::OK();
+  }
+
+  void InsertOriginalRows(int64_t num_rowsets, int64_t rows_per_rowset) {
+    for (int64_t rowset_id = 0; rowset_id < num_rowsets; rowset_id++) {
+      ASSERT_OK(InsertOrUpsertTestRows(RowOperationsPB::INSERT,
+                                       rowset_id * rows_per_rowset,
+                                       rows_per_rowset,
+                                       /*val*/0));
+      ASSERT_OK(tablet()->Flush());
+    }
+    ASSERT_EQ(num_rowsets, tablet()->num_rowsets());
+  }
+
+  void UpdateOriginalRowsNoFlush(int64_t num_rowsets, int64_t rows_per_rowset,
+      int32_t val) {
+    for (int64_t rowset_id = 0; rowset_id < num_rowsets; rowset_id++) {
+      ASSERT_OK(InsertOrUpsertTestRows(RowOperationsPB::UPSERT,
+                                       rowset_id * rows_per_rowset,
+                                       rows_per_rowset,
+                                       val));
+    }
+    ASSERT_EQ(num_rowsets, tablet()->num_rowsets());
+  }
+
   // Insert n_rows rows of data.
   // Each row is the tuple: (string key=hello <n*10 + delta>, val=<n>)
   void InsertRows(MemRowSet* mrs, int n_rows, int delta) {
@@ -554,6 +606,18 @@ class TestCompaction : public KuduRowSetTest {
   void AddExpectedReinsert(Mutation** current_head, int64_t val);
   void AddUpdateAndDelete(RowSet* rs, CompactionInputRow* row, int64_t row_id, 
int64_t val);
 
+  // Workload to generate large sized deltas for compaction.
+  // The method generates 1 MB size worth of deltas with size_factor as 1.
+  // Callers can adjust the size_factor.
+  // For example, to generate 5MB, set size_factor as 5.
+  // Similarly, to generate 35MB, set size_factor as 35.
+  void GenHighMemConsumptionDeltas(uint32_t size_factor);
+
+  // Enables compaction memory budgeting and then runs rowset compaction.
+  // Caller can set constraints on budget and expect the results accordingly.
+  // If constraints are applied, compaction may be skipped.
+  void TestRowSetCompactionWithOrWithoutBudgetingConstraints(bool 
budgeting_constraints_applied);
+
  protected:
   OpId op_id_;
 
@@ -568,6 +632,34 @@ class TestCompaction : public KuduRowSetTest {
   TabletMemTrackers mem_trackers_;
 };
 
+// This test adds workload of rowsets updates in order to
+// generate some number of REDO deltas. Along with that, memory
+// budgeting constraints denoted by flags are enabled in order
+// to make sure that when rowset compaction is invoked, it takes
+// into consideration the amount of free memory left and based on
+// that proceed with the compaction because of availability of memory.
+TEST_F(TestCompaction, TestRowSetCompactionProceedWithNoBudgetingConstraints) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  // 1 as mem factor implies ~(2*1)MB memory requirements, ok for compaction 
to proceed
+  FLAGS_rowset_compaction_delta_memory_factor = 1;
+  TestRowSetCompactionWithOrWithoutBudgetingConstraints(false);
+}
+
+// This test adds workload of rowsets updates in order to
+// generate huge number of REDO deltas. Along with that, memory
+// budgeting constraints denoted by flags are enabled in order
+// to make sure that when rowset compaction is invoked, it takes
+// into consideration the amount of free memory left and based on
+// that skip the compaction because of lack of memory.
+TEST_F(TestCompaction, TestRowSetCompactionSkipWithBudgetingConstraints) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  // 1024000 mem factor implies ~(2*1024000)MB memory requirements forces to 
skip compaction
+  FLAGS_rowset_compaction_delta_memory_factor = 1024000;
+  TestRowSetCompactionWithOrWithoutBudgetingConstraints(true);
+}
+
 TEST_F(TestCompaction, TestMemRowSetInput) {
   // Create a memrowset with 10 rows and several updates.
   shared_ptr<MemRowSet> mrs;
@@ -785,6 +877,55 @@ void TestCompaction::AddUpdateAndDelete(
   AddExpectedReinsert(&row->undo_head, val);
 }
 
+void TestCompaction::TestRowSetCompactionWithOrWithoutBudgetingConstraints(
+    bool budgeting_constraints_applied) {
+  FLAGS_rowset_compaction_memory_estimate_enabled = true;
+  FLAGS_rowset_compaction_ancient_delta_threshold_enabled = true;
+
+  // Ensure memory budgeting applies
+  FLAGS_rowset_compaction_estimate_min_deltas_size_mb = 0;
+
+  // size factor as 2 generates ~2MB memory size worth of deltas
+  GenHighMemConsumptionDeltas(2);
+
+  // Run rowset compaction.
+  StringVectorSink sink;
+  ScopedRegisterSink reg(&sink);
+  scoped_refptr<Trace> trace(new Trace);
+  Stopwatch sw;
+  sw.start();
+  {
+    ADOPT_TRACE(trace.get());
+    ASSERT_OK(tablet()->Compact(Tablet::COMPACT_NO_FLAGS));
+  }
+  sw.stop();
+  LOG(INFO) << Substitute("CompactRowSetsOp complete. Timing: $0 Metrics: $1",
+                            sw.elapsed().ToString(),
+                            trace->MetricsAsJSON());
+
+  if (budgeting_constraints_applied) {
+    ASSERT_STR_CONTAINS(JoinStrings(sink.logged_msgs(), "\n"),
+                        "removed from compaction input due to memory 
constraints");
+  } else {
+    ASSERT_STR_NOT_CONTAINS(JoinStrings(sink.logged_msgs(), "\n"),
+                            "removed from compaction input due to memory 
constraints");
+  }
+}
+
+void TestCompaction::GenHighMemConsumptionDeltas(const uint32_t size_factor) {
+  const uint32_t num_rowsets = 10;
+  const uint32_t num_rows_per_rowset = 2;
+  const uint32_t num_updates = 5000 * size_factor;
+
+  NO_FATALS(InsertOriginalRows(num_rowsets, num_rows_per_rowset));
+
+  // Mutate all of the rows.
+  for (int i = 1; i <= num_updates; i++) {
+    UpdateOriginalRowsNoFlush(num_rowsets, num_rows_per_rowset, i);
+  }
+  ASSERT_OK(tablet()->FlushAllDMSForTests());
+}
+
 // Build several layers of overlapping rowsets with many ghost rows.
 // Repeatedly merge all the generated RowSets until we are left with a single 
RowSet, then make
 // sure that its history matches our expected history.

Reply via email to