This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new b607633fd KUDU-3568 Fix compaction budgeting test by setting memory 
hard limit
b607633fd is described below

commit b607633fd3c2b676fbd2cbe57c44bddf818dc457
Author: Ashwani Raina <ara...@cloudera.com>
AuthorDate: Thu May 9 22:23:14 2024 +0530

    KUDU-3568 Fix compaction budgeting test by setting memory hard limit
    
    TestRowSetCompactionSkipWithBudgetingConstraints can fail if the
    memory on node running the test is high. It happens because the test
    generates deltas of size worth a few MBs that is multiplied with a
    preset factor to ensure the result (i.e. memory required for rowset
    compaction completion) is of high value of the order of 200 GB per
    rowset.
    
    Even though nodes running the test generally don't have so much
    physical memory, it is still possible to end up with high memory nodes.
    On such nodes, the test might fail.
    
    The patch fixes that problem by deterministically ensuring that
    compaction memory requirement is always higher than the memory hard
    limit. It does that by doing the following:
    1. Move out the budgeting compaction tests out in a separate binary.
    2. This gives flexibility to set the memory hard limit as per test
       needs. It is important to node that once a memory hard limit is
       set, it remains the same for all tests executed through
       binary lifecycle.
    3. Set the hard memory limit to 1 GB which is enough to handle compaction
       requirements for TestRowSetCompactionProceedWithNoBudgetingConstraints.
       For TestRowSetCompactionSkipWithBudgetingConstraints, it is not
       enough because we set the delta memory factor high to exceed 1 GB.
       Both the test are now expected to succeed deterministically.
    
    Change-Id: I85d104e1d066507ce8e72a00cc5165cc4b85e48d
    Reviewed-on: http://gerrit.cloudera.org:8080/21416
    Tested-by: Alexey Serbin <ale...@apache.org>
    Reviewed-by: Alexey Serbin <ale...@apache.org>
---
 src/kudu/tablet/CMakeLists.txt             |   1 +
 src/kudu/tablet/compaction-highmem-test.cc | 220 +++++++++++++++++++++++++++++
 src/kudu/tablet/compaction-test.cc         | 143 -------------------
 3 files changed, 221 insertions(+), 143 deletions(-)

diff --git a/src/kudu/tablet/CMakeLists.txt b/src/kudu/tablet/CMakeLists.txt
index c48089ed8..71af1dab0 100644
--- a/src/kudu/tablet/CMakeLists.txt
+++ b/src/kudu/tablet/CMakeLists.txt
@@ -105,6 +105,7 @@ SET_KUDU_TEST_LINK_LIBS(tablet tablet_test_util)
 ADD_KUDU_TEST(all_types-scan-correctness-test NUM_SHARDS 8 PROCESSORS 2)
 ADD_KUDU_TEST(cfile_set-test)
 ADD_KUDU_TEST(compaction-test)
+ADD_KUDU_TEST(compaction-highmem-test)
 ADD_KUDU_TEST(compaction_policy-test DATA_FILES ycsb-test-rowsets.tsv)
 ADD_KUDU_TEST(composite-pushdown-test)
 ADD_KUDU_TEST(delta_compaction-test)
diff --git a/src/kudu/tablet/compaction-highmem-test.cc 
b/src/kudu/tablet/compaction-highmem-test.cc
new file mode 100644
index 000000000..43dbcb63d
--- /dev/null
+++ b/src/kudu/tablet/compaction-highmem-test.cc
@@ -0,0 +1,220 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <type_traits>
+#include <vector>
+
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/common/common.pb.h"
+#include "kudu/common/partial_row.h"
+#include "kudu/common/row_operations.pb.h"
+#include "kudu/common/schema.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/tablet/local_tablet_writer.h"
+#include "kudu/tablet/tablet-test-util.h"
+#include "kudu/tablet/tablet.h"
+#include "kudu/util/logging_test_util.h"
+#include "kudu/util/status.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/trace.h"
+
+DECLARE_bool(rowset_compaction_enforce_preset_factor);
+DECLARE_bool(rowset_compaction_memory_estimate_enabled);
+DECLARE_bool(rowset_compaction_ancient_delta_threshold_enabled);
+DECLARE_double(rowset_compaction_delta_memory_factor);
+DECLARE_int64(memory_limit_hard_bytes);
+DECLARE_uint32(rowset_compaction_estimate_min_deltas_size_mb);
+
+namespace kudu {
+namespace tablet {
+
+class TestHighMemCompaction : public KuduRowSetTest {
+ public:
+  TestHighMemCompaction()
+      : KuduRowSetTest(CreateSchema()) {
+  }
+
+  static Schema CreateSchema() {
+    SchemaBuilder builder;
+    CHECK_OK(builder.AddKeyColumn("key", STRING));
+    CHECK_OK(builder.AddColumn("val", INT64));
+    CHECK_OK(builder.AddNullableColumn("nullable_val", INT32));
+    return builder.BuildWithoutIds();
+  }
+
+  Status InsertOrUpsertTestRows(RowOperationsPB::Type type,
+                                int64_t first_row,
+                                int64_t count,
+                                int32_t val) {
+    LocalTabletWriter writer(tablet().get(), &client_schema());
+    KuduPartialRow row(&client_schema());
+
+    for (int64_t i = first_row; i < first_row + count; i++) {
+      RETURN_NOT_OK(row.SetStringCopy("key", Substitute("hello $0", i)));
+      RETURN_NOT_OK(row.SetInt64("val", val));
+      if (type == RowOperationsPB::INSERT) {
+        RETURN_NOT_OK(writer.Insert(row));
+      } else if (type == RowOperationsPB::UPSERT) {
+        RETURN_NOT_OK(writer.Upsert(row));
+      } else {
+        return Status::InvalidArgument(
+            Substitute("unknown row operation type: $0", type));
+      }
+    }
+    return Status::OK();
+  }
+
+  void InsertOriginalRows(int64_t num_rowsets, int64_t rows_per_rowset) {
+    for (int64_t rowset_id = 0; rowset_id < num_rowsets; rowset_id++) {
+      ASSERT_OK(InsertOrUpsertTestRows(RowOperationsPB::INSERT,
+                                       rowset_id * rows_per_rowset,
+                                       rows_per_rowset,
+                                       /*val*/0));
+      ASSERT_OK(tablet()->Flush());
+    }
+    ASSERT_EQ(num_rowsets, tablet()->num_rowsets());
+  }
+
+  void UpdateOriginalRowsNoFlush(int64_t num_rowsets, int64_t rows_per_rowset,
+      int32_t val) {
+    for (int64_t rowset_id = 0; rowset_id < num_rowsets; rowset_id++) {
+      ASSERT_OK(InsertOrUpsertTestRows(RowOperationsPB::UPSERT,
+                                       rowset_id * rows_per_rowset,
+                                       rows_per_rowset,
+                                       val));
+    }
+    ASSERT_EQ(num_rowsets, tablet()->num_rowsets());
+  }
+
+  // Workload to generate large sized deltas for compaction.
+  // The method generates 1 MB size worth of deltas with size_factor as 1.
+  // Callers can adjust the size_factor.
+  // For example, to generate 5MB, set size_factor as 5.
+  // Similarly, to generate 35MB, set size_factor as 35.
+  void GenHighMemConsumptionDeltas(const uint32_t size_factor);
+
+  // Enables compaction memory budgeting and then runs rowset compaction.
+  // Caller can set constraints on budget and expect the results accordingly.
+  // If constraints are applied, compaction may be skipped.
+  void TestRowSetCompactionWithOrWithoutBudgetingConstraints(bool 
budgeting_constraints_applied);
+
+  static void SetUpTestSuite() {
+    // Keep the memory hard limit as 1GB for deterministic results.
+    // The tests in this file have a requirement of memory hard limit to be of
+    // lower value in order to ensure that test expectations are met.
+    // Since we have initialized memory hard limit here to 1 GB, it is going to
+    // remain the same throughout the lifecyle of this binary.
+    // It is important that no test in this file is expecting memory hard limit
+    // set to physical memory on the node (running the test) i.e. all the tests
+    // are working with the assumption that memory hard limit is limited to 1 
GB.
+    FLAGS_memory_limit_hard_bytes = 1024 * 1024 * 1024;
+
+    FLAGS_rowset_compaction_ancient_delta_threshold_enabled = true;
+    FLAGS_rowset_compaction_enforce_preset_factor = true;
+    FLAGS_rowset_compaction_memory_estimate_enabled = true;
+
+    // Ensure memory budgeting applies
+    FLAGS_rowset_compaction_estimate_min_deltas_size_mb = 0;
+  }
+};
+
+void 
TestHighMemCompaction::TestRowSetCompactionWithOrWithoutBudgetingConstraints(
+    bool budgeting_constraints_applied) {
+  // size factor as 2 generates ~2MB memory size worth of deltas
+  GenHighMemConsumptionDeltas(2);
+
+  // Run rowset compaction.
+  StringVectorSink sink;
+  ScopedRegisterSink reg(&sink);
+  scoped_refptr<Trace> trace(new Trace);
+  Stopwatch sw;
+  sw.start();
+  {
+    ADOPT_TRACE(trace.get());
+    ASSERT_OK(tablet()->Compact(Tablet::COMPACT_NO_FLAGS));
+  }
+  sw.stop();
+  LOG(INFO) << Substitute("CompactRowSetsOp complete. Timing: $0 Metrics: $1",
+                          sw.elapsed().ToString(),
+                          trace->MetricsAsJSON());
+
+  if (budgeting_constraints_applied) {
+    ASSERT_STR_CONTAINS(JoinStrings(sink.logged_msgs(), "\n"),
+                        "removed from compaction input due to memory 
constraints");
+  } else {
+    ASSERT_STR_NOT_CONTAINS(JoinStrings(sink.logged_msgs(), "\n"),
+                            "removed from compaction input due to memory 
constraints");
+  }
+}
+
+void TestHighMemCompaction::GenHighMemConsumptionDeltas(const uint32_t 
size_factor) {
+  constexpr const uint32_t num_rowsets = 10;
+  constexpr const uint32_t num_rows_per_rowset = 2;
+  const uint32_t num_updates = 5000 * size_factor;
+
+  NO_FATALS(InsertOriginalRows(num_rowsets, num_rows_per_rowset));
+
+  // Mutate all of the rows.
+  for (int i = 1; i <= num_updates; i++) {
+    UpdateOriginalRowsNoFlush(num_rowsets, num_rows_per_rowset, i);
+  }
+  ASSERT_OK(tablet()->FlushAllDMSForTests());
+}
+
+// This test adds workload of rowsets updates in order to
+// generate some number of REDO deltas. Along with that, memory
+// budgeting constraints denoted by flags are enabled in order
+// to make sure that when rowset compaction is invoked, it takes
+// into consideration the amount of free memory left and based on
+// that proceed with the compaction because of availability of memory.
+TEST_F(TestHighMemCompaction, 
TestRowSetCompactionProceedWithNoBudgetingConstraints) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  // 1 as mem factor implies ~(2*1)MB memory requirements for all rowsets,
+  // ok for compaction to proceed
+  FLAGS_rowset_compaction_delta_memory_factor = 1;
+  TestRowSetCompactionWithOrWithoutBudgetingConstraints(false);
+}
+
+// This test adds workload of rowsets updates in order to
+// generate huge number of REDO deltas. Along with that, memory
+// budgeting constraints denoted by flags are enabled in order
+// to make sure that when rowset compaction is invoked, it takes
+// into consideration the amount of free memory left and based on
+// that skip the compaction because of lack of memory.
+TEST_F(TestHighMemCompaction, 
TestRowSetCompactionSkipWithBudgetingConstraints) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  // 1024000 mem factor implies ~(2*1024000)MB memory requirements for all 
rowsets,
+  // forces to skip compaction
+  FLAGS_rowset_compaction_delta_memory_factor = 1024000;
+  TestRowSetCompactionWithOrWithoutBudgetingConstraints(true);
+}
+
+} // namespace tablet
+} // namespace kudu
diff --git a/src/kudu/tablet/compaction-test.cc 
b/src/kudu/tablet/compaction-test.cc
index a0d2c35d5..92d81a98b 100644
--- a/src/kudu/tablet/compaction-test.cc
+++ b/src/kudu/tablet/compaction-test.cc
@@ -41,7 +41,6 @@
 #include "kudu/common/partial_row.h"
 #include "kudu/common/row.h"
 #include "kudu/common/row_changelist.h"
-#include "kudu/common/row_operations.pb.h"
 #include "kudu/common/rowblock.h"
 #include "kudu/common/rowblock_memory.h"
 #include "kudu/common/rowid.h"
@@ -57,7 +56,6 @@
 #include "kudu/gutil/casts.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
-#include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/diskrowset.h"
 #include "kudu/tablet/local_tablet_writer.h"
@@ -73,7 +71,6 @@
 #include "kudu/tablet/tablet_metadata.h"
 #include "kudu/util/env.h"
 #include "kudu/util/faststring.h"
-#include "kudu/util/logging_test_util.h"
 #include "kudu/util/memory/arena.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/random.h"
@@ -82,7 +79,6 @@
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
-#include "kudu/util/trace.h"
 
 DEFINE_string(merge_benchmark_input_dir, "",
               "Directory to benchmark merge. The benchmark will merge "
@@ -95,12 +91,7 @@ DEFINE_uint32(merge_benchmark_num_rowsets, 3,
 DEFINE_uint32(merge_benchmark_num_rows_per_rowset, 500000,
               "Number of rowsets as input to the merge");
 
-DECLARE_bool(rowset_compaction_enforce_preset_factor);
-DECLARE_bool(rowset_compaction_memory_estimate_enabled);
-DECLARE_bool(rowset_compaction_ancient_delta_threshold_enabled);
-DECLARE_double(rowset_compaction_delta_memory_factor);
 DECLARE_string(block_manager);
-DECLARE_uint32(rowset_compaction_estimate_min_deltas_size_mb);
 
 using kudu::consensus::OpId;
 using kudu::log::LogAnchorRegistry;
@@ -140,50 +131,6 @@ class TestCompaction : public KuduRowSetTest {
     return builder.BuildWithoutIds();
   }
 
-  Status InsertOrUpsertTestRows(RowOperationsPB::Type type,
-                                int64_t first_row,
-                                int64_t count,
-                                int32_t val) {
-    LocalTabletWriter writer(tablet().get(), &client_schema());
-    KuduPartialRow row(&client_schema());
-
-    for (int64_t i = first_row; i < first_row + count; i++) {
-      RETURN_NOT_OK(row.SetStringCopy("key", Substitute("hello $0", i)));
-      RETURN_NOT_OK(row.SetInt64("val", val));
-      if (type == RowOperationsPB::INSERT) {
-        RETURN_NOT_OK(writer.Insert(row));
-      } else if (type == RowOperationsPB::UPSERT) {
-        RETURN_NOT_OK(writer.Upsert(row));
-      } else {
-        return Status::InvalidArgument(
-            Substitute("unknown row operation type: $0", type));
-      }
-    }
-    return Status::OK();
-  }
-
-  void InsertOriginalRows(int64_t num_rowsets, int64_t rows_per_rowset) {
-    for (int64_t rowset_id = 0; rowset_id < num_rowsets; rowset_id++) {
-      ASSERT_OK(InsertOrUpsertTestRows(RowOperationsPB::INSERT,
-                                       rowset_id * rows_per_rowset,
-                                       rows_per_rowset,
-                                       /*val*/0));
-      ASSERT_OK(tablet()->Flush());
-    }
-    ASSERT_EQ(num_rowsets, tablet()->num_rowsets());
-  }
-
-  void UpdateOriginalRowsNoFlush(int64_t num_rowsets, int64_t rows_per_rowset,
-      int32_t val) {
-    for (int64_t rowset_id = 0; rowset_id < num_rowsets; rowset_id++) {
-      ASSERT_OK(InsertOrUpsertTestRows(RowOperationsPB::UPSERT,
-                                       rowset_id * rows_per_rowset,
-                                       rows_per_rowset,
-                                       val));
-    }
-    ASSERT_EQ(num_rowsets, tablet()->num_rowsets());
-  }
-
   // Insert n_rows rows of data.
   // Each row is the tuple: (string key=hello <n*10 + delta>, val=<n>)
   void InsertRows(MemRowSet* mrs, int n_rows, int delta) {
@@ -607,18 +554,6 @@ class TestCompaction : public KuduRowSetTest {
   void AddExpectedReinsert(Mutation** current_head, int64_t val);
   void AddUpdateAndDelete(RowSet* rs, CompactionInputRow* row, int64_t row_id, 
int64_t val);
 
-  // Workload to generate large sized deltas for compaction.
-  // The method generates 1 MB size worth of deltas with size_factor as 1.
-  // Callers can adjust the size_factor.
-  // For example, to generate 5MB, set size_factor as 5.
-  // Similarly, to generate 35MB, set size_factor as 35.
-  void GenHighMemConsumptionDeltas(uint32_t size_factor);
-
-  // Enables compaction memory budgeting and then runs rowset compaction.
-  // Caller can set constraints on budget and expect the results accordingly.
-  // If constraints are applied, compaction may be skipped.
-  void TestRowSetCompactionWithOrWithoutBudgetingConstraints(bool 
budgeting_constraints_applied);
-
  protected:
   OpId op_id_;
 
@@ -633,34 +568,6 @@ class TestCompaction : public KuduRowSetTest {
   TabletMemTrackers mem_trackers_;
 };
 
-// This test adds workload of rowsets updates in order to
-// generate some number of REDO deltas. Along with that, memory
-// budgeting constraints denoted by flags are enabled in order
-// to make sure that when rowset compaction is invoked, it takes
-// into consideration the amount of free memory left and based on
-// that proceed with the compaction because of availability of memory.
-TEST_F(TestCompaction, TestRowSetCompactionProceedWithNoBudgetingConstraints) {
-  SKIP_IF_SLOW_NOT_ALLOWED();
-
-  // 1 as mem factor implies ~(2*1)MB memory requirements, ok for compaction 
to proceed
-  FLAGS_rowset_compaction_delta_memory_factor = 1;
-  TestRowSetCompactionWithOrWithoutBudgetingConstraints(false);
-}
-
-// This test adds workload of rowsets updates in order to
-// generate huge number of REDO deltas. Along with that, memory
-// budgeting constraints denoted by flags are enabled in order
-// to make sure that when rowset compaction is invoked, it takes
-// into consideration the amount of free memory left and based on
-// that skip the compaction because of lack of memory.
-TEST_F(TestCompaction, TestRowSetCompactionSkipWithBudgetingConstraints) {
-  SKIP_IF_SLOW_NOT_ALLOWED();
-
-  // 1024000 mem factor implies ~(2*1024000)MB memory requirements forces to 
skip compaction
-  FLAGS_rowset_compaction_delta_memory_factor = 1024000;
-  TestRowSetCompactionWithOrWithoutBudgetingConstraints(true);
-}
-
 TEST_F(TestCompaction, TestMemRowSetInput) {
   // Create a memrowset with 10 rows and several updates.
   shared_ptr<MemRowSet> mrs;
@@ -878,56 +785,6 @@ void TestCompaction::AddUpdateAndDelete(
   AddExpectedReinsert(&row->undo_head, val);
 }
 
-void TestCompaction::TestRowSetCompactionWithOrWithoutBudgetingConstraints(
-    bool budgeting_constraints_applied) {
-  FLAGS_rowset_compaction_memory_estimate_enabled = true;
-  FLAGS_rowset_compaction_enforce_preset_factor = true;
-  FLAGS_rowset_compaction_ancient_delta_threshold_enabled = true;
-
-  // Ensure memory budgeting applies
-  FLAGS_rowset_compaction_estimate_min_deltas_size_mb = 0;
-
-  // size factor as 2 generates ~2MB memory size worth of deltas
-  GenHighMemConsumptionDeltas(2);
-
-  // Run rowset compaction.
-  StringVectorSink sink;
-  ScopedRegisterSink reg(&sink);
-  scoped_refptr<Trace> trace(new Trace);
-  Stopwatch sw;
-  sw.start();
-  {
-    ADOPT_TRACE(trace.get());
-    ASSERT_OK(tablet()->Compact(Tablet::COMPACT_NO_FLAGS));
-  }
-  sw.stop();
-  LOG(INFO) << Substitute("CompactRowSetsOp complete. Timing: $0 Metrics: $1",
-                            sw.elapsed().ToString(),
-                            trace->MetricsAsJSON());
-
-  if (budgeting_constraints_applied) {
-    ASSERT_STR_CONTAINS(JoinStrings(sink.logged_msgs(), "\n"),
-                        "removed from compaction input due to memory 
constraints");
-  } else {
-    ASSERT_STR_NOT_CONTAINS(JoinStrings(sink.logged_msgs(), "\n"),
-                            "removed from compaction input due to memory 
constraints");
-  }
-}
-
-void TestCompaction::GenHighMemConsumptionDeltas(const uint32_t size_factor) {
-  const uint32_t num_rowsets = 10;
-  const uint32_t num_rows_per_rowset = 2;
-  const uint32_t num_updates = 5000 * size_factor;
-
-  NO_FATALS(InsertOriginalRows(num_rowsets, num_rows_per_rowset));
-
-  // Mutate all of the rows.
-  for (int i = 1; i <= num_updates; i++) {
-    UpdateOriginalRowsNoFlush(num_rowsets, num_rows_per_rowset, i);
-  }
-  ASSERT_OK(tablet()->FlushAllDMSForTests());
-}
-
 // Build several layers of overlapping rowsets with many ghost rows.
 // Repeatedly merge all the generated RowSets until we are left with a single 
RowSet, then make
 // sure that its history matches our expected history.

Reply via email to