This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 89e2715fa [compaction] Add tests to generate high memory rowset
compaction
89e2715fa is described below
commit 89e2715faf96afe0b67482166fda9c8699e8052f
Author: Ashwani Raina <[email protected]>
AuthorDate: Tue Dec 19 17:32:31 2023 +0530
[compaction] Add tests to generate high memory rowset compaction
This change implements a better approach to test out
high memory compaction case. There is an existing patch:
https://gerrit.cloudera.org/#/c/19278/
that uses different approach which does not scale
well because of excessive memory usage by client.
Patch contains two tests:
1. TestRowSetCompactionProceedWithNoBudgetingConstraints
(generates deltas with memory requirements under budget)
2. TestRowSetCompactionSkipWithBudgetingConstraints
(generates deltas with memory requirements over budget)
and a helper function:
GenHighMemConsumptionDeltas
Using this helper function, callers can generate deltas of different
sizes as per test needs. The size_factor can be used to achieve
that. Increasing size_factor by 1 adds 1 MB worth of deltas as
far as rowset compaction memory consumption is concerned.
Change-Id: I1996558e71c49314c6acf12faf854c796548318c
Reviewed-on: http://gerrit.cloudera.org:8080/20816
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <[email protected]>
---
src/kudu/tablet/compaction-test.cc | 141 +++++++++++++++++++++++++++++++++++++
1 file changed, 141 insertions(+)
diff --git a/src/kudu/tablet/compaction-test.cc
b/src/kudu/tablet/compaction-test.cc
index 7abc0b4ca..1d9abfe1b 100644
--- a/src/kudu/tablet/compaction-test.cc
+++ b/src/kudu/tablet/compaction-test.cc
@@ -41,6 +41,7 @@
#include "kudu/common/partial_row.h"
#include "kudu/common/row.h"
#include "kudu/common/row_changelist.h"
+#include "kudu/common/row_operations.pb.h"
#include "kudu/common/rowblock.h"
#include "kudu/common/rowblock_memory.h"
#include "kudu/common/rowid.h"
@@ -56,6 +57,7 @@
#include "kudu/gutil/casts.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/diskrowset.h"
#include "kudu/tablet/local_tablet_writer.h"
@@ -71,6 +73,7 @@
#include "kudu/tablet/tablet_metadata.h"
#include "kudu/util/env.h"
#include "kudu/util/faststring.h"
+#include "kudu/util/logging_test_util.h"
#include "kudu/util/memory/arena.h"
#include "kudu/util/monotime.h"
#include "kudu/util/random.h"
@@ -79,6 +82,7 @@
#include "kudu/util/stopwatch.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
+#include "kudu/util/trace.h"
DEFINE_string(merge_benchmark_input_dir, "",
"Directory to benchmark merge. The benchmark will merge "
@@ -91,7 +95,11 @@ DEFINE_uint32(merge_benchmark_num_rowsets, 3,
DEFINE_uint32(merge_benchmark_num_rows_per_rowset, 500000,
"Number of rowsets as input to the merge");
+DECLARE_bool(rowset_compaction_memory_estimate_enabled);
+DECLARE_bool(rowset_compaction_ancient_delta_threshold_enabled);
+DECLARE_double(rowset_compaction_delta_memory_factor);
DECLARE_string(block_manager);
+DECLARE_uint32(rowset_compaction_estimate_min_deltas_size_mb);
using kudu::consensus::OpId;
using kudu::log::LogAnchorRegistry;
@@ -131,6 +139,50 @@ class TestCompaction : public KuduRowSetTest {
return builder.BuildWithoutIds();
}
+ Status InsertOrUpsertTestRows(RowOperationsPB::Type type,
+ int64_t first_row,
+ int64_t count,
+ int32_t val) {
+ LocalTabletWriter writer(tablet().get(), &client_schema());
+ KuduPartialRow row(&client_schema());
+
+ for (int64_t i = first_row; i < first_row + count; i++) {
+ RETURN_NOT_OK(row.SetStringCopy("key", Substitute("hello $0", i)));
+ RETURN_NOT_OK(row.SetInt64("val", val));
+ if (type == RowOperationsPB::INSERT) {
+ RETURN_NOT_OK(writer.Insert(row));
+ } else if (type == RowOperationsPB::UPSERT) {
+ RETURN_NOT_OK(writer.Upsert(row));
+ } else {
+ return Status::InvalidArgument(
+ Substitute("unknown row operation type: $0", type));
+ }
+ }
+ return Status::OK();
+ }
+
+ void InsertOriginalRows(int64_t num_rowsets, int64_t rows_per_rowset) {
+ for (int64_t rowset_id = 0; rowset_id < num_rowsets; rowset_id++) {
+ ASSERT_OK(InsertOrUpsertTestRows(RowOperationsPB::INSERT,
+ rowset_id * rows_per_rowset,
+ rows_per_rowset,
+ /*val*/0));
+ ASSERT_OK(tablet()->Flush());
+ }
+ ASSERT_EQ(num_rowsets, tablet()->num_rowsets());
+ }
+
+ void UpdateOriginalRowsNoFlush(int64_t num_rowsets, int64_t rows_per_rowset,
+ int32_t val) {
+ for (int64_t rowset_id = 0; rowset_id < num_rowsets; rowset_id++) {
+ ASSERT_OK(InsertOrUpsertTestRows(RowOperationsPB::UPSERT,
+ rowset_id * rows_per_rowset,
+ rows_per_rowset,
+ val));
+ }
+ ASSERT_EQ(num_rowsets, tablet()->num_rowsets());
+ }
+
// Insert n_rows rows of data.
// Each row is the tuple: (string key=hello <n*10 + delta>, val=<n>)
void InsertRows(MemRowSet* mrs, int n_rows, int delta) {
@@ -554,6 +606,18 @@ class TestCompaction : public KuduRowSetTest {
void AddExpectedReinsert(Mutation** current_head, int64_t val);
void AddUpdateAndDelete(RowSet* rs, CompactionInputRow* row, int64_t row_id,
int64_t val);
+ // Workload to generate large sized deltas for compaction.
+ // The method generates 1 MB size worth of deltas with size_factor as 1.
+ // Callers can adjust the size_factor.
+ // For example, to generate 5MB, set size_factor as 5.
+ // Similarly, to generate 35MB, set size_factor as 35.
+ void GenHighMemConsumptionDeltas(uint32_t size_factor);
+
+ // Enables compaction memory budgeting and then runs rowset compaction.
+ // Caller can set constraints on budget and expect the results accordingly.
+ // If constraints are applied, compaction may be skipped.
+ void TestRowSetCompactionWithOrWithoutBudgetingConstraints(bool
budgeting_constraints_applied);
+
protected:
OpId op_id_;
@@ -568,6 +632,34 @@ class TestCompaction : public KuduRowSetTest {
TabletMemTrackers mem_trackers_;
};
+// This test adds workload of rowsets updates in order to
+// generate some number of REDO deltas. Along with that, memory
+// budgeting constraints denoted by flags are enabled in order
+// to make sure that when rowset compaction is invoked, it takes
+// into consideration the amount of free memory left and based on
+// that proceed with the compaction because of availability of memory.
+TEST_F(TestCompaction, TestRowSetCompactionProceedWithNoBudgetingConstraints) {
+ SKIP_IF_SLOW_NOT_ALLOWED();
+
+ // 1 as mem factor implies ~(2*1)MB memory requirements, ok for compaction
to proceed
+ FLAGS_rowset_compaction_delta_memory_factor = 1;
+ TestRowSetCompactionWithOrWithoutBudgetingConstraints(false);
+}
+
+// This test adds workload of rowsets updates in order to
+// generate huge number of REDO deltas. Along with that, memory
+// budgeting constraints denoted by flags are enabled in order
+// to make sure that when rowset compaction is invoked, it takes
+// into consideration the amount of free memory left and based on
+// that skip the compaction because of lack of memory.
+TEST_F(TestCompaction, TestRowSetCompactionSkipWithBudgetingConstraints) {
+ SKIP_IF_SLOW_NOT_ALLOWED();
+
+ // 1024000 mem factor implies ~(2*1024000)MB memory requirements forces to
skip compaction
+ FLAGS_rowset_compaction_delta_memory_factor = 1024000;
+ TestRowSetCompactionWithOrWithoutBudgetingConstraints(true);
+}
+
TEST_F(TestCompaction, TestMemRowSetInput) {
// Create a memrowset with 10 rows and several updates.
shared_ptr<MemRowSet> mrs;
@@ -785,6 +877,55 @@ void TestCompaction::AddUpdateAndDelete(
AddExpectedReinsert(&row->undo_head, val);
}
+void TestCompaction::TestRowSetCompactionWithOrWithoutBudgetingConstraints(
+ bool budgeting_constraints_applied) {
+ FLAGS_rowset_compaction_memory_estimate_enabled = true;
+ FLAGS_rowset_compaction_ancient_delta_threshold_enabled = true;
+
+ // Ensure memory budgeting applies
+ FLAGS_rowset_compaction_estimate_min_deltas_size_mb = 0;
+
+ // size factor as 2 generates ~2MB memory size worth of deltas
+ GenHighMemConsumptionDeltas(2);
+
+ // Run rowset compaction.
+ StringVectorSink sink;
+ ScopedRegisterSink reg(&sink);
+ scoped_refptr<Trace> trace(new Trace);
+ Stopwatch sw;
+ sw.start();
+ {
+ ADOPT_TRACE(trace.get());
+ ASSERT_OK(tablet()->Compact(Tablet::COMPACT_NO_FLAGS));
+ }
+ sw.stop();
+ LOG(INFO) << Substitute("CompactRowSetsOp complete. Timing: $0 Metrics: $1",
+ sw.elapsed().ToString(),
+ trace->MetricsAsJSON());
+
+ if (budgeting_constraints_applied) {
+ ASSERT_STR_CONTAINS(JoinStrings(sink.logged_msgs(), "\n"),
+ "removed from compaction input due to memory
constraints");
+ } else {
+ ASSERT_STR_NOT_CONTAINS(JoinStrings(sink.logged_msgs(), "\n"),
+ "removed from compaction input due to memory
constraints");
+ }
+}
+
+void TestCompaction::GenHighMemConsumptionDeltas(const uint32_t size_factor) {
+ const uint32_t num_rowsets = 10;
+ const uint32_t num_rows_per_rowset = 2;
+ const uint32_t num_updates = 5000 * size_factor;
+
+ NO_FATALS(InsertOriginalRows(num_rowsets, num_rows_per_rowset));
+
+ // Mutate all of the rows.
+ for (int i = 1; i <= num_updates; i++) {
+ UpdateOriginalRowsNoFlush(num_rowsets, num_rows_per_rowset, i);
+ }
+ ASSERT_OK(tablet()->FlushAllDMSForTests());
+}
+
// Build several layers of overlapping rowsets with many ghost rows.
// Repeatedly merge all the generated RowSets until we are left with a single
RowSet, then make
// sure that its history matches our expected history.