This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch branch-1.18.x
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 19fb1851ed3f91724ccb6d9d27511e89d49dee89
Author: Ashwani Raina <[email protected]>
AuthorDate: Tue Nov 19 19:50:42 2024 +0530

    [compaction] Add memory tracking for better OOM issues triaging
    
    This patch adds memory accounting for Major Delta Compaction.
    The same memory usage is tracked under newly added MemTracker.
    Along with that, throttled warning messages are added when the
    memory usage goes beyond a certain threshold.
    
    A unit test is added to verify the presence of warning messages
    when memory threshold (reduced for test purpose) is crossed.
    
    Change-Id: Ic2037582d433730884212e83359bb75bad0d5394
    Reviewed-on: http://gerrit.cloudera.org:8080/22079
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <[email protected]>
    (cherry picked from commit d405ad33080cc5b203e3789363a202d472aca795)
    Reviewed-on: http://gerrit.cloudera.org:8080/22415
    Reviewed-by: Ashwani Raina <[email protected]>
    Tested-by: Alexey Serbin <[email protected]>
    Reviewed-by: Abhishek Chennaka <[email protected]>
---
 src/kudu/common/rowblock_memory.h          |  3 +-
 src/kudu/tablet/compaction-highmem-test.cc | 34 +++++++++++++++
 src/kudu/tablet/delta_compaction.cc        | 68 +++++++++++++++++++++++++++---
 src/kudu/tablet/delta_compaction.h         | 12 ++++++
 src/kudu/tablet/delta_store.h              | 10 +++++
 src/kudu/tablet/deltafile.cc               |  1 +
 src/kudu/util/process_memory.cc            | 16 +++++++
 src/kudu/util/process_memory.h             |  4 ++
 8 files changed, 141 insertions(+), 7 deletions(-)

diff --git a/src/kudu/common/rowblock_memory.h 
b/src/kudu/common/rowblock_memory.h
index 5a11b4b70..12fc5978c 100644
--- a/src/kudu/common/rowblock_memory.h
+++ b/src/kudu/common/rowblock_memory.h
@@ -24,6 +24,7 @@
 
 namespace kudu {
 
+const size_t kInitRowBlockMemSize = 32 * 1024; // Initial memory size 
allocated for rowblock
 class RowBlockRefCounted;
 
 // Handles the memory allocated alongside a RowBlock for variable-length
@@ -41,7 +42,7 @@ class RowBlockRefCounted;
 struct RowBlockMemory {
   Arena arena;
 
-  explicit RowBlockMemory(int arena_size = 32 * 1024) : arena(arena_size) {}
+  explicit RowBlockMemory(int arena_size = kInitRowBlockMemSize) : 
arena(arena_size) {}
   ~RowBlockMemory() { Reset(); }
 
   void Reset() {
diff --git a/src/kudu/tablet/compaction-highmem-test.cc 
b/src/kudu/tablet/compaction-highmem-test.cc
index 43dbcb63d..7bb0b3e1b 100644
--- a/src/kudu/tablet/compaction-highmem-test.cc
+++ b/src/kudu/tablet/compaction-highmem-test.cc
@@ -34,6 +34,7 @@
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/local_tablet_writer.h"
+#include "kudu/tablet/rowset.h"
 #include "kudu/tablet/tablet-test-util.h"
 #include "kudu/tablet/tablet.h"
 #include "kudu/util/logging_test_util.h"
@@ -46,6 +47,7 @@
 DECLARE_bool(rowset_compaction_enforce_preset_factor);
 DECLARE_bool(rowset_compaction_memory_estimate_enabled);
 DECLARE_bool(rowset_compaction_ancient_delta_threshold_enabled);
+DECLARE_double(memory_limit_compact_usage_warn_threshold_percentage);
 DECLARE_double(rowset_compaction_delta_memory_factor);
 DECLARE_int64(memory_limit_hard_bytes);
 DECLARE_uint32(rowset_compaction_estimate_min_deltas_size_mb);
@@ -123,6 +125,9 @@ class TestHighMemCompaction : public KuduRowSetTest {
   // If constraints are applied, compaction may be skipped.
   void TestRowSetCompactionWithOrWithoutBudgetingConstraints(bool 
budgeting_constraints_applied);
 
+  // Tests appropriate logs are printed when major compaction crosses memory 
threshold.
+  void TestMajorCompactionCrossingMemoryThreshold();
+
   static void SetUpTestSuite() {
     // Keep the memory hard limit as 1GB for deterministic results.
     // The tests in this file have a requirement of memory hard limit to be of
@@ -172,6 +177,20 @@ void 
TestHighMemCompaction::TestRowSetCompactionWithOrWithoutBudgetingConstraint
   }
 }
 
+void TestHighMemCompaction::TestMajorCompactionCrossingMemoryThreshold() {
+  // Size factor as 2 generates ~2MB memory size worth of deltas.
+  GenHighMemConsumptionDeltas(2);
+
+  // Run major delta compaction.
+  StringVectorSink sink;
+  ScopedRegisterSink reg(&sink);
+  scoped_refptr<Trace> trace(new Trace);
+  ADOPT_TRACE(trace.get());
+  ASSERT_OK(tablet()->CompactWorstDeltas(RowSet::MAJOR_DELTA_COMPACTION));
+  ASSERT_STR_CONTAINS(JoinStrings(sink.logged_msgs(), "\n"),
+                      "Beyond hard memory limit of");
+}
+
 void TestHighMemCompaction::GenHighMemConsumptionDeltas(const uint32_t 
size_factor) {
   constexpr const uint32_t num_rowsets = 10;
   constexpr const uint32_t num_rows_per_rowset = 2;
@@ -216,5 +235,20 @@ TEST_F(TestHighMemCompaction, 
TestRowSetCompactionSkipWithBudgetingConstraints)
   TestRowSetCompactionWithOrWithoutBudgetingConstraints(true);
 }
 
+TEST_F(TestHighMemCompaction, TestMajorCompactionMemoryPressure) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  // Approximate memory consumed by delta compaction during the course of this 
test.
+  // Total consumption would always exceed this usage. Since we want to be
+  // certain that warning threshold limit is crossed, this value is kept low.
+  const int64_t compaction_mem_usage_approx = 3 * 1024 * 1024;
+
+  // Set appropriate flags to ensure memory threshold checks fail.
+  FLAGS_memory_limit_compact_usage_warn_threshold_percentage =
+      static_cast<double>(compaction_mem_usage_approx * 100) / 
FLAGS_memory_limit_hard_bytes;
+
+  TestMajorCompactionCrossingMemoryThreshold();
+}
+
 } // namespace tablet
 } // namespace kudu
diff --git a/src/kudu/tablet/delta_compaction.cc 
b/src/kudu/tablet/delta_compaction.cc
index dc612ef1c..94d7ae1cc 100644
--- a/src/kudu/tablet/delta_compaction.cc
+++ b/src/kudu/tablet/delta_compaction.cc
@@ -17,7 +17,6 @@
 
 #include "kudu/tablet/delta_compaction.h"
 
-#include <cstdint>
 #include <map>
 #include <ostream>
 #include <string>
@@ -25,6 +24,7 @@
 #include <utility>
 #include <vector>
 
+#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 
 #include "kudu/common/generic_iterators.h"
@@ -40,6 +40,7 @@
 #include "kudu/gutil/casts.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/cfile_set.h"
@@ -52,14 +53,15 @@
 #include "kudu/tablet/mutation.h"
 #include "kudu/tablet/mvcc.h"
 #include "kudu/tablet/rowset_metadata.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/mem_tracker.h"
+#include "kudu/util/memory/arena.h"
+#include "kudu/util/process_memory.h"
 #include "kudu/util/trace.h"
 
-namespace kudu {
-class Arena;
-}
+DECLARE_int32(memory_limit_warn_threshold_percentage);
 
 using kudu::fs::BlockCreationTransaction;
-using kudu::fs::BlockManager;
 using kudu::fs::CreateBlockOptions;
 using kudu::fs::IOContext;
 using kudu::fs::WritableBlock;
@@ -98,9 +100,14 @@ MajorDeltaCompaction::MajorDeltaCompaction(
       undo_delta_mutations_written_(0),
       state_(kInitialized) {
   CHECK(!column_ids_.empty());
+  parent_tracker_ = MemTracker::FindOrCreateGlobalTracker(-1, 
"major_compaction");
+  tracker_ = MemTracker::CreateTracker(-1, Substitute("major_compaction:$0",
+                                                      tablet_id_),
+                                       parent_tracker_);
 }
 
 MajorDeltaCompaction::~MajorDeltaCompaction() {
+  tracker_->Release(tracker_->consumption());
 }
 
 string MajorDeltaCompaction::ColumnNamesToString() const {
@@ -117,6 +124,24 @@ string MajorDeltaCompaction::ColumnNamesToString() const {
   return JoinStrings(col_names, ", ");
 }
 
+// Log warning messages if the memory consumption has exceeded a certain 
threshold.
+void MajorDeltaCompaction::MemoryExceededWarnMsgs() {
+  if (process_memory::OverHardLimitThreshold()) {
+    string msg = StringPrintf(
+        "Beyond hard memory limit of %ld with current consumption at %ld. "
+        "MajorDeltaCompaction ops consumption: tablet-%s %ld, total %ld.",
+        process_memory::HardLimit(), process_memory::CurrentConsumption(),
+        tablet_id_.c_str(), tracker_->consumption(), 
parent_tracker_->consumption());
+    KLOG_EVERY_N_SECS(WARNING, 1) << msg
+                                  << THROTTLE_MSG;
+  }
+}
+
+void MajorDeltaCompaction::UpdateMemTracker(int64_t mem_consumed) {
+  tracker_->Consume(mem_consumed);
+  MemoryExceededWarnMsgs();
+}
+
 Status MajorDeltaCompaction::FlushRowSetAndDeltas(const IOContext* io_context) 
{
   CHECK_EQ(state_, kInitialized);
 
@@ -134,7 +159,10 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas(const 
IOContext* io_context) {
   RETURN_NOT_OK(delta_iter_->Init(&spec));
   RETURN_NOT_OK(delta_iter_->SeekToOrdinal(0));
 
-  RowBlockMemory mem(32 * 1024);
+  RowBlockMemory mem(kInitRowBlockMemSize);
+  // Update memory tracker with initialized arena memory footprint.
+  UpdateMemTracker(mem.arena.memory_footprint());
+
   RowBlock block(&partial_schema_, kRowsPerBlock, &mem);
 
   DVLOG(1) << "Applying deltas and rewriting columns (" << 
partial_schema_.ToString() << ")";
@@ -145,6 +173,8 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas(const 
IOContext* io_context) {
   MvccSnapshot snap = MvccSnapshot::CreateSnapshotIncludingAllOps();
   while (old_base_data_rwise->HasNext()) {
 
+    // Update memory tracker with arena memory footprint that is to be 
released.
+    UpdateMemTracker(-static_cast<int64_t>(mem.arena.memory_footprint()));
     // 1) Get the next batch of base data for the columns we're compacting.
     mem.Reset();
     RETURN_NOT_OK(old_base_data_rwise->NextBlock(&block));
@@ -152,9 +182,19 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas(const 
IOContext* io_context) {
 
     // 2) Fetch all the REDO mutations.
     vector<Mutation *> redo_mutation_block(kRowsPerBlock, static_cast<Mutation 
*>(nullptr));
+    size_t delta_blocks_mem_before_prepare_batch = 
delta_iter_->memory_footprint();
     RETURN_NOT_OK(delta_iter_->PrepareBatch(n, 
DeltaIterator::PREPARE_FOR_COLLECT));
+
+    // Update memory tracker with memory allocated for delta blocks.
+    UpdateMemTracker(
+        static_cast<int64_t>(delta_iter_->memory_footprint()) -
+        static_cast<int64_t>(delta_blocks_mem_before_prepare_batch));
+
     RETURN_NOT_OK(delta_iter_->CollectMutations(&redo_mutation_block, 
block.arena()));
 
+    // Update memory tracker with memory allocated for mutations.
+    UpdateMemTracker(mem.arena.memory_footprint());
+
     // 3) Write new UNDO mutations for the current block. The REDO mutations
     //    are written out in step 6.
     vector<CompactionInputRow> input_rows(block.nrows());
@@ -179,6 +219,7 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas(const 
IOContext* io_context) {
       DVLOG(3) << "MDC Input Row - RowId: " << row_id << " "
                << CompactionInputRowToString(*input_row);
 
+      size_t arena_memory_footprint_before_mutations = 
mem.arena.memory_footprint();
       RETURN_NOT_OK(ApplyMutationsAndGenerateUndos(snap,
                                                    *input_row,
                                                    &mem.arena,
@@ -186,6 +227,12 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas(const 
IOContext* io_context) {
                                                    &dst_row,
                                                    &new_undos_head,
                                                    &new_redos_head));
+      // Update memory tracker to account for memory allocation for
+      // prepared deltas, in-memory blocks and in-memory change lists.
+      UpdateMemTracker(
+          static_cast<int64_t>(mem.arena.memory_footprint()) -
+          static_cast<int64_t>(arena_memory_footprint_before_mutations));
+
       RemoveAncientUndos(history_gc_opts_,
                          new_redos_head,
                          &new_undos_head);
@@ -208,6 +255,9 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas(const 
IOContext* io_context) {
     // 4) Write the new base data.
     RETURN_NOT_OK(base_data_writer_->AppendBlock(block));
 
+    // Update memory tracker with arena memory footprint that is to be 
released.
+    UpdateMemTracker(-static_cast<int64_t>(mem.arena.memory_footprint()));
+
     // 5) Remove the columns that we've done our major REDO delta compaction on
     //    from this delta flush, except keep all the delete and reinsert
     //    mutations.
@@ -215,6 +265,9 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas(const 
IOContext* io_context) {
     vector<DeltaKeyAndUpdate> out;
     RETURN_NOT_OK(delta_iter_->FilterColumnIdsAndCollectDeltas(column_ids_, 
&out, &mem.arena));
 
+    // Update memory tracker with arena memory footprint for reinsert and 
delete mutations.
+    UpdateMemTracker(mem.arena.memory_footprint());
+
     // We only create a new redo delta file if we need to.
     if (!out.empty() && !new_redo_delta_writer_) {
       RETURN_NOT_OK(OpenRedoDeltaFileWriter());
@@ -344,6 +397,9 @@ Status MajorDeltaCompaction::Compact(const IOContext* 
io_context) {
   TRACE_COUNTER_INCREMENT("delete_count", stats.delete_count);
   TRACE_COUNTER_INCREMENT("reinsert_count", stats.reinsert_count);
   TRACE_COUNTER_INCREMENT("update_count", stats.update_count);
+
+  // Update stats with peak memory consumption.
+  TRACE_COUNTER_INCREMENT("peak_mem_usage", tracker_->peak_consumption());
   VLOG(1) << Substitute("Finished major delta compaction of columns $0. "
                         "Compacted $1 delta files. Overall stats: $2",
                         ColumnNamesToString(),
diff --git a/src/kudu/tablet/delta_compaction.h 
b/src/kudu/tablet/delta_compaction.h
index 9bc5caa50..36725a3d2 100644
--- a/src/kudu/tablet/delta_compaction.h
+++ b/src/kudu/tablet/delta_compaction.h
@@ -17,6 +17,7 @@
 #pragma once
 
 #include <cstddef>
+#include <cstdint>
 #include <memory>
 #include <string>
 #include <vector>
@@ -30,6 +31,7 @@
 namespace kudu {
 
 class FsManager;
+class MemTracker;
 
 namespace fs {
 struct IOContext;
@@ -97,6 +99,13 @@ class MajorDeltaCompaction {
   // deltas need to be written back into a delta file.
   Status FlushRowSetAndDeltas(const fs::IOContext* io_context);
 
+  // Logs warning messages if compaction has crossed a certain memory 
threshold limit.
+  void MemoryExceededWarnMsgs();
+
+  // Updates memory tracker for compaction operation for the tablet under 
process and
+  // logs warning messages if memory exceeds a certain limit..
+  void UpdateMemTracker(int64_t mem_consumed);
+
   FsManager* const fs_manager_;
 
   // TODO: doc me
@@ -143,6 +152,9 @@ class MajorDeltaCompaction {
     kFinished = 2,
   };
   State state_;
+
+  std::shared_ptr<MemTracker> parent_tracker_;
+  std::shared_ptr<MemTracker> tracker_;
 };
 
 } // namespace tablet
diff --git a/src/kudu/tablet/delta_store.h b/src/kudu/tablet/delta_store.h
index 5d81b11f2..686d40086 100644
--- a/src/kudu/tablet/delta_store.h
+++ b/src/kudu/tablet/delta_store.h
@@ -475,6 +475,16 @@ class DeltaPreparer : public PreparedDeltas {
     return deltas_selected_;
   }
 
+  int64_t delta_blocks_mem_size() const {
+    int64_t delta_blocks_size_total = prepared_deltas_.size() * 
sizeof(PreparedDelta);
+
+    for (const auto& delta : prepared_deltas_) {
+      delta_blocks_size_total += delta.val.size();
+    }
+
+    return delta_blocks_size_total;
+  }
+
   void set_deltas_selected(int64_t deltas_selected) {
     // NOTE: it's possible that the iterator's number of 'deltas_selected'
     // hasn't changed if no deltas were selected since last being called, hence
diff --git a/src/kudu/tablet/deltafile.cc b/src/kudu/tablet/deltafile.cc
index a99533eb7..7a865ab6f 100644
--- a/src/kudu/tablet/deltafile.cc
+++ b/src/kudu/tablet/deltafile.cc
@@ -675,6 +675,7 @@ Status DeltaFileIterator<Type>::AddDeltas(rowid_t 
start_row, rowid_t stop_row) {
       }
     }
   }
+  delta_blocks_mem_size_ += preparer_.delta_blocks_mem_size();
 
   return Status::OK();
 }
diff --git a/src/kudu/util/process_memory.cc b/src/kudu/util/process_memory.cc
index a100e4a61..c379f9413 100644
--- a/src/kudu/util/process_memory.cc
+++ b/src/kudu/util/process_memory.cc
@@ -66,6 +66,12 @@ DEFINE_int32(memory_limit_warn_threshold_percentage, 98,
              "consume before WARNING level messages are periodically logged.");
 TAG_FLAG(memory_limit_warn_threshold_percentage, advanced);
 
+// TODO(araina): Remove this flag when compaction logic starts honoring hard 
limit memory setting.
+DEFINE_double(memory_limit_compact_usage_warn_threshold_percentage, 105.0,
+              "Percentage of the hard memory limit that this daemon may 
consume before WARNING "
+              "level messages are periodically logged during an ongoing 
compaction op.");
+TAG_FLAG(memory_limit_compact_usage_warn_threshold_percentage, experimental);
+
 #ifdef TCMALLOC_ENABLED
 DEFINE_bool(disable_tcmalloc_gc_by_memory_tracker_for_testing, false,
             "For testing only! Whether to disable tcmalloc GC by memory 
tracker.");
@@ -228,6 +234,16 @@ int64_t HardLimit() {
   return g_hard_limit;
 }
 
+bool OverHardLimitThreshold() {
+  InitLimits();
+  int64_t over_hard_limit_threshold =
+      g_hard_limit * 
FLAGS_memory_limit_compact_usage_warn_threshold_percentage / 100;
+  if (CurrentConsumption() > over_hard_limit_threshold) {
+    return true;
+  }
+  return false;
+}
+
 int64_t SoftLimit() {
   InitLimits();
   return g_soft_limit;
diff --git a/src/kudu/util/process_memory.h b/src/kudu/util/process_memory.h
index 048b8186b..548d5f218 100644
--- a/src/kudu/util/process_memory.h
+++ b/src/kudu/util/process_memory.h
@@ -47,6 +47,10 @@ int64_t MaxMemoryAvailable();
 // Return the configured hard limit for the process.
 int64_t HardLimit();
 
+// Return true if we have crossed hard limit by a certain margin. The margin
+// is defined by memory_limit_compact_usage_warn_threshold_percentage flag.
+bool OverHardLimitThreshold();
+
 // Return the configured soft limit for the process.
 int64_t SoftLimit();
 

Reply via email to