This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 80bc33c7e [compaction] Release delta blocks memory early to avoid more 
consumption.
80bc33c7e is described below

commit 80bc33c7e1a126c083aacf7735d979cfbcb6b981
Author: Ashwani Raina <[email protected]>
AuthorDate: Tue Jan 14 15:10:26 2025 +0530

    [compaction] Release delta blocks memory early to avoid more consumption.
    
    MajorDelta compaction relies on object scope to release the memory
    used to store delta blocks that are read during the PrepareBatch phase
    of the compaction operation. The use of delta blocks is till the point
    when REDO mutations are applied to base data and corresponding UNDO
    mutations are created for each REDO mutation.
    
    This patch releases the memory as soon as compaction is done using
    delta blocks. This will avoid unnecessary pressure on memory for the
    rest of the compaction operation. This is also a precursor to MajorDelta
    chunked compaction work.
    
    Change-Id: I0c1eadcb8ad735fc92ac77f554934e606e4119ae
    Reviewed-on: http://gerrit.cloudera.org:8080/22338
    Tested-by: Kudu Jenkins
    Reviewed-by: Attila Bukor <[email protected]>
    Reviewed-by: Alexey Serbin <[email protected]>
---
 src/kudu/tablet/delta_compaction.cc      | 2 ++
 src/kudu/tablet/delta_iterator_merger.cc | 7 +++++++
 src/kudu/tablet/delta_iterator_merger.h  | 1 +
 src/kudu/tablet/delta_store.h            | 6 ++++++
 src/kudu/tablet/deltafile.cc             | 2 +-
 src/kudu/tablet/deltafile.h              | 7 +++++++
 src/kudu/tablet/deltamemstore.h          | 4 ++++
 7 files changed, 28 insertions(+), 1 deletion(-)

diff --git a/src/kudu/tablet/delta_compaction.cc 
b/src/kudu/tablet/delta_compaction.cc
index d3d94ca4c..9afe2e527 100644
--- a/src/kudu/tablet/delta_compaction.cc
+++ b/src/kudu/tablet/delta_compaction.cc
@@ -288,6 +288,8 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas(const 
IOContext* io_context) {
     redo_delta_mutations_written_ += out.size();
     nrows += n;
   }
+  // Release memory used to hold delta blocks data read during batch 
preparation.
+  RETURN_NOT_OK(delta_iter_->FreeDeltaBlocks());
 
   auto bm = fs_manager_->block_manager();
   unique_ptr<BlockCreationTransaction> transaction = 
bm->NewCreationTransaction();
diff --git a/src/kudu/tablet/delta_iterator_merger.cc 
b/src/kudu/tablet/delta_iterator_merger.cc
index b9968bea6..261e3a37d 100644
--- a/src/kudu/tablet/delta_iterator_merger.cc
+++ b/src/kudu/tablet/delta_iterator_merger.cc
@@ -120,6 +120,13 @@ Status 
DeltaIteratorMerger::FilterColumnIdsAndCollectDeltas(
   return Status::OK();
 }
 
+Status DeltaIteratorMerger::FreeDeltaBlocks() {
+  for (const auto& iter : iters_) {
+    RETURN_NOT_OK(iter->FreeDeltaBlocks());
+  }
+  return Status::OK();
+}
+
 bool DeltaIteratorMerger::HasNext() const {
   for (const unique_ptr<DeltaIterator>& iter : iters_) {
     if (iter->HasNext()) {
diff --git a/src/kudu/tablet/delta_iterator_merger.h 
b/src/kudu/tablet/delta_iterator_merger.h
index 7653a306f..0d9f10eb9 100644
--- a/src/kudu/tablet/delta_iterator_merger.h
+++ b/src/kudu/tablet/delta_iterator_merger.h
@@ -80,6 +80,7 @@ class DeltaIteratorMerger : public DeltaIterator {
   Status FilterColumnIdsAndCollectDeltas(const std::vector<ColumnId>& col_ids,
                                          std::vector<DeltaKeyAndUpdate>* out,
                                          Arena* arena) override;
+  Status FreeDeltaBlocks() override;
 
   bool HasNext() const override;
 
diff --git a/src/kudu/tablet/delta_store.h b/src/kudu/tablet/delta_store.h
index 686d40086..006673edd 100644
--- a/src/kudu/tablet/delta_store.h
+++ b/src/kudu/tablet/delta_store.h
@@ -363,6 +363,12 @@ class DeltaIterator : public PreparedDeltas {
   };
   virtual Status PrepareBatch(size_t nrows, int prepare_flags) = 0;
 
+  // Frees up delta blocks that are read during PrepareBatch phase.
+  // This must be called after PrepareBatch phase is complete and
+  // all the mutations from each delta have been collected and applied,
+  // i.e. after CollectMutations and FilterColumnIdsAndCollectDeltas.
+  virtual Status FreeDeltaBlocks() = 0;
+
   // Returns true if there are any more rows left in this iterator.
   virtual bool HasNext() const = 0;
 
diff --git a/src/kudu/tablet/deltafile.cc b/src/kudu/tablet/deltafile.cc
index 7a865ab6f..b2ce3ba59 100644
--- a/src/kudu/tablet/deltafile.cc
+++ b/src/kudu/tablet/deltafile.cc
@@ -503,7 +503,7 @@ Status DeltaFileIterator<Type>::ReadCurrentBlockOntoQueue() 
{
                                   dblk_ptr, cache_blocks_, &block));
 
   // The data has been successfully read. Finish creating the decoder.
-  PreparedDeltaBlock pdb(dblk_ptr, std::move(block),  0);
+  PreparedDeltaBlock pdb(dblk_ptr, std::move(block), 0);
 
   // Decode the block.
   RETURN_NOT_OK_PREPEND(pdb.decoder.ParseHeader(),
diff --git a/src/kudu/tablet/deltafile.h b/src/kudu/tablet/deltafile.h
index 2146ee39f..cc18f4ab6 100644
--- a/src/kudu/tablet/deltafile.h
+++ b/src/kudu/tablet/deltafile.h
@@ -266,6 +266,13 @@ class DeltaFileIterator : public DeltaIterator {
                                          std::vector<DeltaKeyAndUpdate>* out,
                                          Arena* arena) override;
 
+  // Release memory that holds delta blocks data read during batch preparation.
+  Status FreeDeltaBlocks() override {
+    delta_blocks_.clear();
+    delta_blocks_mem_size_ = 0;
+    return Status::OK();
+  }
+
   std::string ToString() const override;
 
   bool HasNext() const override;
diff --git a/src/kudu/tablet/deltamemstore.h b/src/kudu/tablet/deltamemstore.h
index 736c635f2..5613524af 100644
--- a/src/kudu/tablet/deltamemstore.h
+++ b/src/kudu/tablet/deltamemstore.h
@@ -237,6 +237,10 @@ class DMSIterator : public DeltaIterator {
                                          std::vector<DeltaKeyAndUpdate>* out,
                                          Arena* arena) override;
 
+  Status FreeDeltaBlocks() override {
+    return Status::NotSupported("No delta blocks queue is maintained for 
DMS.");
+  }
+
   std::string ToString() const override;
 
   bool HasNext() const override;

Reply via email to