This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 4a857ef4a46e5bb1f28b3b8f096e7f0ca006bdc2
Author: Martin Zink <[email protected]>
AuthorDate: Wed Mar 25 00:53:26 2026 +0100

    MINIFICPP-2696 Add option for optimizationForSmallDB in DatabaseConte…
    
    I’ve went ahead and this some testing so we can set a sensible default 
value for this new configurable value. Based on my testing it seems the 
processing overhead is negligible, unfortunately the memory gains are also not 
what one expect (flow specific)
    
    I’ve created a bunch of flows and tried it various cache sizes.
    
    - Simple GenerateFlowFile → LogAttribute 5MB Text (random)
     - processing throughput remained the same while achieving 10% lower memory 
footprint on avarage, but the memory usage was not consistent
    - TailFile(30k lines) → MergeFile → LogAttribute, with GetFile → 
LogAttribute (to increase memory usage)
     - Processing time of the tailfile remained within run-to-run variance with 
20-30% lower memory footprint
    
    I’ve also noticed that compaction time and periodic 
https://man7.org/linux/man-pages/man3/malloc_trim.3.html makes memory footprint 
even smaller (much more noticably in some cases)
    
    Methodology
    Processing time was measured via log messages, memory footprint was 
measured via prometheus AgentStatus metrics
    Baseline was setting the new variable to an empty string (which basically 
mimics the current main behaviour)
    
    **I couldn’t measure performance losses but only tried with a small set of 
relatively simple flows, since this PR now changes the default behavior its 
important that we emphasise this change and the method to disable this in the 
release logs.**
    
    Closes #2081
    
    Signed-off-by: Marton Szasz <[email protected]>
---
 CONFIGURE.md                                       |  14 ++
 conf/minifi.properties.in                          |   1 +
 .../rocksdb-repos/DatabaseContentRepository.cpp    |  40 +++-
 extensions/rocksdb-repos/database/OpenRocksDb.cpp  |  26 +--
 extensions/rocksdb-repos/database/OpenRocksDb.h    |   2 +
 extensions/rocksdb-repos/database/RocksDbUtils.h   |  12 ++
 .../tests/DBContentRepositoryTests.cpp             |  41 ++++
 libminifi/src/Configuration.cpp                    |   1 +
 .../state/nodes/RepositoryMetricsSourceStore.cpp   |   6 +
 libminifi/test/libtest/unit/ProvenanceTestHelper.h |   4 +-
 libminifi/test/libtest/unit/TestUtils.cpp          |  39 ++++
 libminifi/test/libtest/unit/TestUtils.h            |  18 +-
 libminifi/test/unit/LogMetricsPublisherTests.cpp   | 206 +++++++++------------
 libminifi/test/unit/MetricsTests.cpp               |   4 +-
 .../minifi-cpp/core/RepositoryMetricsSource.h      |   2 +
 .../include/minifi-cpp/properties/Configuration.h  |   1 +
 16 files changed, 275 insertions(+), 142 deletions(-)

diff --git a/CONFIGURE.md b/CONFIGURE.md
index 62e8a8768..40cb44b7b 100644
--- a/CONFIGURE.md
+++ b/CONFIGURE.md
@@ -41,6 +41,7 @@
   - [Configuring Repositories](#configuring-repositories)
   - [Configuring Volatile Repositories](#configuring-volatile-repositories)
   - [Configuring Repository storage 
locations](#configuring-repository-storage-locations)
+  - [Configuring cache size for rocksdb content 
repository](#configuring-cache-size-for-rocksdb-content-repository)
   - [Configuring compression for rocksdb 
database](#configuring-compression-for-rocksdb-database)
   - [Configuring compaction for rocksdb 
database](#configuring-compaction-for-rocksdb-database)
   - [Configuring synchronous or asynchronous writes for RocksDB content 
repository](#configuring-synchronous-or-asynchronous-writes-for-rocksdb-content-repository)
@@ -674,6 +675,19 @@ In a Filesystem Hierarchy Standard (FHS) installation 
(from an RPM package), the
     
nifi.flowfile.repository.directory.default=/var/lib/nifi-minifi-cpp/flowfile_repository
     
nifi.database.content.repository.directory.default=/var/lib/nifi-minifi-cpp/content_repository
 
+### Configuring cache size for rocksdb content repository
+
+The RocksDB content repository uses a cache to limit memory usage. The cache 
size can be configured using the following property.
+This should limit the memory usage but may cause minimal processing overhead.
+
+    # in minifi.properties
+    nifi.database.content.repository.optimize.for.small.db.cache.size=8 MB
+
+You can disable this cache by setting it to an empty value.
+
+    # in minifi.properties
+    nifi.database.content.repository.optimize.for.small.db.cache.size=
+
 
 ### Configuring compression for rocksdb database
 
diff --git a/conf/minifi.properties.in b/conf/minifi.properties.in
index 636632e8c..afcb7f9d9 100644
--- a/conf/minifi.properties.in
+++ b/conf/minifi.properties.in
@@ -52,6 +52,7 @@ nifi.content.repository.class.name=DatabaseContentRepository
 ## Relates to the internal workings of the rocksdb backend
 # nifi.flowfile.repository.rocksdb.compaction.period=2 min
 # nifi.database.content.repository.rocksdb.compaction.period=2 min
+# nifi.database.content.repository.optimize.for.small.db.cache.size=8 MB
 
 # setting this value to "0" enables synchronous deletion
 # nifi.database.content.repository.purge.period = 1 sec
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp 
b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
index ddb7b0dc0..80b3a832e 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
@@ -58,21 +58,55 @@ bool DatabaseContentRepository::initialize(const 
std::shared_ptr<minifi::Configu
 
   setCompactionPeriod(configuration);
 
-  auto set_db_opts = [encrypted_env] 
(minifi::internal::Writable<rocksdb::DBOptions>& db_opts) {
+  const auto cache_size = 
configuration->get(Configure::nifi_dbcontent_optimize_for_small_db_cache_size)
+      | utils::orElse([] { return std::make_optional<std::string>("8 MB"); })
+      | utils::andThen([](const auto& cache_size_str) -> 
std::optional<uint64_t> {
+        if (cache_size_str.empty()) {
+          return std::nullopt;
+        }
+        return parsing::parseDataSize(cache_size_str) | 
utils::orThrow(fmt::format("{} has invalid format {}", 
Configure::nifi_dbcontent_optimize_for_small_db_cache_size, cache_size_str));
+      });
+
+  std::shared_ptr<rocksdb::Cache> cache = nullptr;
+  std::shared_ptr<rocksdb::WriteBufferManager> wbm = nullptr;
+  if (cache_size) {
+    cache = rocksdb::NewLRUCache(*cache_size);
+    wbm = std::make_shared<rocksdb::WriteBufferManager>(0, cache);
+    logger_->log_trace("Using {} sized cache for DatabaseContentRepository", 
*cache_size);
+  } else {
+    logger_->log_trace("Cache limitation disabled for 
DatabaseContentRepository");
+  }
+
+  auto set_db_opts = [encrypted_env, &cache, &wbm] 
(minifi::internal::Writable<rocksdb::DBOptions>& db_opts) {
     minifi::internal::setCommonRocksDbOptions(db_opts);
     if (encrypted_env) {
       db_opts.set(&rocksdb::DBOptions::env, encrypted_env.get(), 
EncryptionEq{});
     } else {
       db_opts.set(&rocksdb::DBOptions::env, rocksdb::Env::Default());
     }
+    db_opts.optimizeForSmallDb(cache, wbm);
   };
-  auto set_cf_opts = [&configuration] (rocksdb::ColumnFamilyOptions& cf_opts) {
+  auto set_cf_opts = [&configuration, &cache] (rocksdb::ColumnFamilyOptions& 
cf_opts) {
     cf_opts.OptimizeForPointLookup(4);
     cf_opts.merge_operator = std::make_shared<StringAppender>();
     cf_opts.max_successive_merges = 0;
-    if (auto compression_type = 
minifi::internal::readConfiguredCompressionType(configuration, 
Configure::nifi_content_repository_rocksdb_compression)) {
+    cf_opts.max_write_buffer_number = 2;
+    cf_opts.write_buffer_size = 4_MB;
+    if (const auto compression_type = 
internal::readConfiguredCompressionType(configuration, 
Configure::nifi_content_repository_rocksdb_compression)) {
       cf_opts.compression = *compression_type;
     }
+    if (cache) {
+      rocksdb::BlockBasedTableOptions table_options;
+      table_options.block_cache = cache;
+
+      table_options.cache_index_and_filter_blocks = true;
+      table_options.cache_index_and_filter_blocks_with_high_priority = true;
+
+      table_options.pin_l0_filter_and_index_blocks_in_cache = false;
+      table_options.pin_top_level_index_and_filter = false;
+
+      
cf_opts.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options));
+    }
   };
   db_ = minifi::internal::RocksDatabase::create(set_db_opts, set_cf_opts, 
directory_,
     minifi::internal::getRocksDbOptionsToOverride(configuration, 
Configure::nifi_content_repository_rocksdb_options));
diff --git a/extensions/rocksdb-repos/database/OpenRocksDb.cpp 
b/extensions/rocksdb-repos/database/OpenRocksDb.cpp
index c48c4b62c..9989ab3bd 100644
--- a/extensions/rocksdb-repos/database/OpenRocksDb.cpp
+++ b/extensions/rocksdb-repos/database/OpenRocksDb.cpp
@@ -135,23 +135,23 @@ std::optional<uint64_t> 
OpenRocksDb::getApproximateSizes() const {
   return std::nullopt;
 }
 
-minifi::core::RepositoryMetricsSource::RocksDbStats OpenRocksDb::getStats() {
-  minifi::core::RepositoryMetricsSource::RocksDbStats stats;
-  std::string table_readers;
-  GetProperty("rocksdb.estimate-table-readers-mem", &table_readers);
+void OpenRocksDb::fillU64FromProperty(uint64_t& member, std::string_view 
property_name) {
+  std::string property_value;
+  GetProperty(property_name, &property_value);
   try {
-    stats.table_readers_size = std::stoull(table_readers);
+    member = std::stoull(property_value);
   } catch (const std::exception&) {
-    logger_->log_warn("Could not retrieve valid 
'rocksdb.estimate-table-readers-mem' property value from rocksdb content 
repository!");
+    logger_->log_warn("Could not retrieve valid '{}' property value from 
rocksdb content repository!", property_name);
   }
+}
 
-  std::string all_memtables;
-  GetProperty("rocksdb.cur-size-all-mem-tables", &all_memtables);
-  try {
-    stats.all_memory_tables_size = std::stoull(all_memtables);
-  } catch (const std::exception&) {
-    logger_->log_warn("Could not retrieve valid 
'rocksdb.cur-size-all-mem-tables' property value from rocksdb content 
repository!");
-  }
+
+minifi::core::RepositoryMetricsSource::RocksDbStats OpenRocksDb::getStats() {
+  minifi::core::RepositoryMetricsSource::RocksDbStats stats;
+  fillU64FromProperty(stats.table_readers_size, 
"rocksdb.estimate-table-readers-mem");
+  fillU64FromProperty(stats.all_memory_tables_size, 
"rocksdb.cur-size-all-mem-tables");
+  fillU64FromProperty(stats.block_cache_usage, "rocksdb.block-cache-usage");
+  fillU64FromProperty(stats.block_cache_pinned_usage, 
"rocksdb.block-cache-pinned-usage");
 
   return stats;
 }
diff --git a/extensions/rocksdb-repos/database/OpenRocksDb.h 
b/extensions/rocksdb-repos/database/OpenRocksDb.h
index b97fe880a..dfd93b28c 100644
--- a/extensions/rocksdb-repos/database/OpenRocksDb.h
+++ b/extensions/rocksdb-repos/database/OpenRocksDb.h
@@ -81,6 +81,8 @@ class OpenRocksDb {
   void handleResult(const rocksdb::Status& result);
   void handleResult(const std::vector<rocksdb::Status>& results);
 
+  void fillU64FromProperty(uint64_t& member, std::string_view property_name);
+
   gsl::not_null<RocksDbInstance*> db_;
   gsl::not_null<std::shared_ptr<rocksdb::DB>> impl_;
   gsl::not_null<std::shared_ptr<ColumnHandle>> column_;
diff --git a/extensions/rocksdb-repos/database/RocksDbUtils.h 
b/extensions/rocksdb-repos/database/RocksDbUtils.h
index 627517577..07eaed95c 100644
--- a/extensions/rocksdb-repos/database/RocksDbUtils.h
+++ b/extensions/rocksdb-repos/database/RocksDbUtils.h
@@ -49,6 +49,18 @@ class Writable {
     }
   }
 
+  template <typename Method, typename... Args>
+  decltype(auto) call(Method method, Args&&... args) {
+    return std::invoke(method, target_, std::forward<Args>(args)...);
+  }
+
+  void optimizeForSmallDb(std::shared_ptr<rocksdb::Cache> cache, 
std::shared_ptr<rocksdb::WriteBufferManager> wbm) {
+    if (!cache || !wbm) { return; }
+    target_.OptimizeForSmallDb(&cache);
+    target_.write_buffer_manager = wbm;
+    target_.max_open_files = 20;
+  }
+
   template<typename F>
   const F& get(F T::* member) {
     return target_.*member;
diff --git a/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp 
b/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp
index 50195748b..835487039 100644
--- a/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp
+++ b/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp
@@ -355,3 +355,44 @@ TEST_CASE("DBContentRepository can clear orphan entries") {
 
   REQUIRE(getDbSize(dir) == 0);
 }
+
+TEST_CASE("nifi_dbcontent_optimize_for_small_db_cache_size default") {
+  
LogTestController::getInstance().setTrace<core::repository::DatabaseContentRepository>();
+
+  const auto configuration = 
std::make_shared<org::apache::nifi::minifi::ConfigureImpl>();
+  const auto content_repo = 
std::make_shared<core::repository::DatabaseContentRepository>();
+  REQUIRE(content_repo->initialize(configuration));
+
+  CHECK(LogTestController::getInstance().contains("Using 8388608 sized cache 
for DatabaseContentRepository"));
+}
+
+TEST_CASE("nifi_dbcontent_optimize_for_small_db_cache_size override") {
+  
LogTestController::getInstance().setTrace<core::repository::DatabaseContentRepository>();
+
+  const auto configuration = 
std::make_shared<org::apache::nifi::minifi::ConfigureImpl>();
+  
configuration->set("nifi.database.content.repository.optimize.for.small.db.cache.size",
 "100 MB");
+  const auto content_repo = 
std::make_shared<core::repository::DatabaseContentRepository>();
+  REQUIRE(content_repo->initialize(configuration));
+
+  CHECK(LogTestController::getInstance().contains("Using 104857600 sized cache 
for DatabaseContentRepository"));
+}
+
+TEST_CASE("nifi_dbcontent_optimize_for_small_db_cache_size disable") {
+  
LogTestController::getInstance().setTrace<core::repository::DatabaseContentRepository>();
+
+  const auto configuration = 
std::make_shared<org::apache::nifi::minifi::ConfigureImpl>();
+  
configuration->set("nifi.database.content.repository.optimize.for.small.db.cache.size",
 "");
+  const auto content_repo = 
std::make_shared<core::repository::DatabaseContentRepository>();
+  REQUIRE(content_repo->initialize(configuration));
+
+  CHECK(LogTestController::getInstance().contains("Cache limitation disabled 
for DatabaseContentRepository"));
+}
+
+TEST_CASE("nifi_dbcontent_optimize_for_small_db_cache_size invalid") {
+  
LogTestController::getInstance().setTrace<core::repository::DatabaseContentRepository>();
+
+  const auto configuration = 
std::make_shared<org::apache::nifi::minifi::ConfigureImpl>();
+  
configuration->set("nifi.database.content.repository.optimize.for.small.db.cache.size",
 "two million bytes");
+  const auto content_repo = 
std::make_shared<core::repository::DatabaseContentRepository>();
+  REQUIRE_THROWS(content_repo->initialize(configuration));
+}
diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp
index 191705fac..17dba361b 100644
--- a/libminifi/src/Configuration.cpp
+++ b/libminifi/src/Configuration.cpp
@@ -54,6 +54,7 @@ const std::unordered_map<std::string_view, 
gsl::not_null<const core::PropertyVal
   {Configuration::nifi_flowfile_repository_rocksdb_read_verify_checksums, 
gsl::make_not_null(&core::StandardPropertyValidators::BOOLEAN_VALIDATOR)},
   {Configuration::nifi_provenance_repository_rocksdb_read_verify_checksums, 
gsl::make_not_null(&core::StandardPropertyValidators::BOOLEAN_VALIDATOR)},
   {Configuration::nifi_rocksdb_state_storage_read_verify_checksums, 
gsl::make_not_null(&core::StandardPropertyValidators::BOOLEAN_VALIDATOR)},
+  {Configuration::nifi_dbcontent_optimize_for_small_db_cache_size, 
gsl::make_not_null(&core::StandardPropertyValidators::DATA_SIZE_VALIDATOR)},
   {Configuration::nifi_dbcontent_repository_purge_period, 
gsl::make_not_null(&core::StandardPropertyValidators::TIME_PERIOD_VALIDATOR)},
   {Configuration::nifi_remote_input_secure, 
gsl::make_not_null(&core::StandardPropertyValidators::BOOLEAN_VALIDATOR)},
   {Configuration::nifi_security_need_ClientAuth, 
gsl::make_not_null(&core::StandardPropertyValidators::BOOLEAN_VALIDATOR)},
diff --git a/libminifi/src/core/state/nodes/RepositoryMetricsSourceStore.cpp 
b/libminifi/src/core/state/nodes/RepositoryMetricsSourceStore.cpp
index 0d4789f8c..ef414837a 100644
--- a/libminifi/src/core/state/nodes/RepositoryMetricsSourceStore.cpp
+++ b/libminifi/src/core/state/nodes/RepositoryMetricsSourceStore.cpp
@@ -48,6 +48,8 @@ std::vector<SerializedResponseNode> 
RepositoryMetricsSourceStore::serialize() co
     if (auto rocksdb_stats = repo->getRocksDbStats()) {
       parent.children.push_back({.name = "rocksDbTableReadersSize", .value = 
rocksdb_stats->table_readers_size});
       parent.children.push_back({.name = "rocksDbAllMemoryTablesSize", .value 
= rocksdb_stats->all_memory_tables_size});
+      parent.children.push_back({.name = "rocksDbBlockCacheUsage", .value = 
rocksdb_stats->block_cache_usage});
+      parent.children.push_back({.name = "rocksDbBlockCachePinnedUsage", 
.value = rocksdb_stats->block_cache_pinned_usage});
     }
 
     serialized.push_back(parent);
@@ -68,6 +70,10 @@ std::vector<PublishedMetric> 
RepositoryMetricsSourceStore::calculateMetrics() co
         {{"metric_class", name_}, {"repository_name", 
repo->getRepositoryName()}}});
       metrics.push_back({"rocksdb_all_memory_tables_size_bytes", 
static_cast<double>(rocksdb_stats->all_memory_tables_size),
         {{"metric_class", name_}, {"repository_name", 
repo->getRepositoryName()}}});
+      metrics.push_back({"rocksdb_block_cache_usage_bytes", 
static_cast<double>(rocksdb_stats->block_cache_usage),
+        {{"metric_class", name_}, {"repository_name", 
repo->getRepositoryName()}}});
+      metrics.push_back({"rocksdb_block_cache_pinned_usage_bytes", 
static_cast<double>(rocksdb_stats->block_cache_pinned_usage),
+        {{"metric_class", name_}, {"repository_name", 
repo->getRepositoryName()}}});
     }
   }
   return metrics;
diff --git a/libminifi/test/libtest/unit/ProvenanceTestHelper.h 
b/libminifi/test/libtest/unit/ProvenanceTestHelper.h
index 2b5b70bca..4da9cea14 100644
--- a/libminifi/test/libtest/unit/ProvenanceTestHelper.h
+++ b/libminifi/test/libtest/unit/ProvenanceTestHelper.h
@@ -161,7 +161,9 @@ class TestRocksDbRepository : public TestThreadedRepository 
{
   std::optional<RocksDbStats> getRocksDbStats() const override {
     return RocksDbStats {
       .table_readers_size = 100,
-      .all_memory_tables_size = 200
+      .all_memory_tables_size = 200,
+      .block_cache_usage = 85,
+      .block_cache_pinned_usage = 50
     };
   }
 };
diff --git a/libminifi/test/libtest/unit/TestUtils.cpp 
b/libminifi/test/libtest/unit/TestUtils.cpp
index c56f44f75..9064ed357 100644
--- a/libminifi/test/libtest/unit/TestUtils.cpp
+++ b/libminifi/test/libtest/unit/TestUtils.cpp
@@ -260,4 +260,43 @@ std::error_code sendMessagesViaSSL(const 
std::vector<std::string_view>& contents
   return {};
 }
 
+std::vector<LogMessageView> extractLogMessageViews(const std::string& log_str) 
{
+  std::vector<LogMessageView> messages;
+  const std::regex 
header_pattern(R"(\[([\d\-\s\:\.]+)\]\s+\[(.*?)\]\s+\[(.*?)\])");
+  struct HeaderMarker {
+    size_t start;
+    std::string_view timestamp;
+    std::string_view logger_class;
+    std::string_view log_level;
+    size_t end;
+  };
+
+  std::vector<HeaderMarker> markers = 
ranges::subrange<std::sregex_iterator>(std::sregex_iterator(log_str.begin(), 
log_str.end(), header_pattern),
+                                       std::sregex_iterator()) |
+      ranges::views::transform([=](const std::smatch& m) {
+        return HeaderMarker{.start = static_cast<size_t>(m.position(0)),
+            .timestamp = std::string_view{log_str.data() + m.position(1), 
static_cast<size_t>(m.length(1))},
+            .logger_class = std::string_view{log_str.data() + m.position(2), 
static_cast<size_t>(m.length(2))},
+            .log_level = std::string_view{log_str.data() + m.position(3), 
static_cast<size_t>(m.length(3))},
+            .end = static_cast<size_t>(m.position(0) + m.length(0))
+        };
+      }) | ranges::to<std::vector>();
+
+  markers.push_back(HeaderMarker{.start = log_str.size(),
+      .timestamp = {},
+      .logger_class = {},
+      .log_level = {},
+      .end = log_str.size()
+  });
+
+  for (auto window: markers | ranges::views::sliding(2)) {
+    messages.push_back(LogMessageView{.timestamp = window[0].timestamp,
+      .logger_class = window[0].logger_class,
+      .log_level = window[0].log_level,
+      .payload = {log_str.data() + window[0].end, window[1].start - 
window[0].end}});
+  }
+
+  return messages;
+}
+
 }  // namespace org::apache::nifi::minifi::test::utils
diff --git a/libminifi/test/libtest/unit/TestUtils.h 
b/libminifi/test/libtest/unit/TestUtils.h
index 64af75c7c..c65840540 100644
--- a/libminifi/test/libtest/unit/TestUtils.h
+++ b/libminifi/test/libtest/unit/TestUtils.h
@@ -37,9 +37,8 @@
 #include "asio.hpp"
 #include "asio/ssl.hpp"
 #include "utils/net/Ssl.h"
-#include "range/v3/algorithm/any_of.hpp"
 #include "core/Processor.h"
-#include "core/logging/LoggerFactory.h"
+#include <range/v3/all.hpp>
 #include "./ProcessorUtils.h"
 
 using namespace std::literals::chrono_literals;
@@ -127,6 +126,12 @@ bool verifyLogLineVariantPresenceInPollTime(const 
std::chrono::duration<Rep, Per
   return verifyEventHappenedInPollTime(wait_duration, check);
 }
 
+template<class Rep, class Period>
+bool verifyLogMatchesRegexInPollTime(const std::chrono::duration<Rep, Period>& 
wait_duration, const std::string& regex) {
+  auto check = [&regex] { return 
LogTestController::getInstance().matchesRegex(regex); };
+  return verifyEventHappenedInPollTime(wait_duration, check);
+}
+
 namespace internal {
 struct JsonContext {
   const JsonContext *parent{nullptr};
@@ -234,6 +239,15 @@ inline bool runningAsUnixRoot() {
 #endif
 }
 
+struct LogMessageView {
+  std::string_view timestamp;
+  std::string_view logger_class;
+  std::string_view log_level;
+  std::string_view payload;
+};
+
+std::vector<LogMessageView> extractLogMessageViews(const std::string& log_str);
+
 }  // namespace org::apache::nifi::minifi::test::utils
 
 namespace Catch {
diff --git a/libminifi/test/unit/LogMetricsPublisherTests.cpp 
b/libminifi/test/unit/LogMetricsPublisherTests.cpp
index abf8d5509..39ccb235b 100644
--- a/libminifi/test/unit/LogMetricsPublisherTests.cpp
+++ b/libminifi/test/unit/LogMetricsPublisherTests.cpp
@@ -18,11 +18,11 @@
 #include <memory>
 #include <thread>
 
-#include "unit/TestBase.h"
-#include "unit/Catch.h"
+#include "core/RepositoryFactory.h"
 #include "core/state/LogMetricsPublisher.h"
 #include "core/state/nodes/ResponseNodeLoader.h"
-#include "core/RepositoryFactory.h"
+#include "unit/Catch.h"
+#include "unit/TestBase.h"
 #include "unit/TestUtils.h"
 #include "utils/file/FileUtils.h"
 
@@ -33,12 +33,12 @@ namespace org::apache::nifi::minifi::test {
 class LogPublisherTestFixture {
  public:
   LogPublisherTestFixture()
-    : configuration_(std::make_shared<ConfigureImpl>()),
-      provenance_repo_(core::createRepository("provenancerepository", 
"provenancerepository")),
-      flow_file_repo_(core::createRepository("flowfilerepository", 
"flowfilerepository")),
-      
response_node_loader_(std::make_shared<state::response::ResponseNodeLoaderImpl>(configuration_,
-        
std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{provenance_repo_, 
flow_file_repo_}, nullptr)),
-      
publisher_(std::make_unique<minifi::state::LogMetricsPublisher>("LogMetricsPublisher"))
 {
+      : configuration_(std::make_shared<ConfigureImpl>()),
+        provenance_repo_(core::createRepository("provenancerepository", 
"provenancerepository")),
+        flow_file_repo_(core::createRepository("flowfilerepository", 
"flowfilerepository")),
+        
response_node_loader_(std::make_shared<state::response::ResponseNodeLoaderImpl>(configuration_,
+            
std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{provenance_repo_, 
flow_file_repo_}, nullptr)),
+        
publisher_(std::make_unique<minifi::state::LogMetricsPublisher>("LogMetricsPublisher"))
 {
     provenance_repo_->initialize(configuration_);
     flow_file_repo_->initialize(configuration_);
   }
@@ -67,13 +67,13 @@ class LogPublisherTestFixture {
 TEST_CASE_METHOD(LogPublisherTestFixture, "Logging interval property is 
mandatory", "[LogMetricsPublisher]") {
   
LogTestController::getInstance().setTrace<minifi::state::LogMetricsPublisher>();
   SECTION("No logging interval is set") {
-    REQUIRE_THROWS_WITH(publisher_->initialize(configuration_, 
response_node_loader_), "General Operation: Metrics logging interval not 
configured for log metrics publisher!");
+    REQUIRE_THROWS_WITH(publisher_->initialize(configuration_, 
response_node_loader_),
+        "General Operation: Metrics logging interval not configured for log 
metrics publisher!");
   }
   SECTION("Logging interval is set to 2 seconds") {
     
configuration_->set(minifi::Configuration::nifi_metrics_publisher_log_metrics_logging_interval,
 "2s");
-    using 
org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime;
     publisher_->initialize(configuration_, response_node_loader_);
-    REQUIRE(verifyLogLinePresenceInPollTime(5s, "Metric logging interval is 
set to 2000ms"));
+    REQUIRE(utils::verifyLogLinePresenceInPollTime(5s, "Metric logging 
interval is set to 2000ms"));
   }
 }
 
@@ -86,8 +86,43 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify empty 
metrics if no valid metr
   }
   publisher_->initialize(configuration_, response_node_loader_);
   publisher_->loadMetricNodes();
-  using 
org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime;
-  REQUIRE(verifyLogLinePresenceInPollTime(5s, "LogMetricsPublisher is 
configured without any valid metrics!"));
+  REQUIRE(utils::verifyLogLinePresenceInPollTime(5s, "LogMetricsPublisher is 
configured without any valid metrics!"));
+}
+
+bool check_exact_metrics_value(const rapidjson::Value& repo_metrics, const 
std::string_view key, const std::string_view expected_value) {
+  const auto key_ref = rapidjson::StringRef(key.data(), key.size());
+  const auto member_it = repo_metrics.FindMember(key_ref);
+  if (member_it == repo_metrics.MemberEnd()) { return false; }
+  const auto actual_value = std::string_view{member_it->value.GetString(), 
member_it->value.GetStringLength()};
+  return actual_value == expected_value;
+}
+
+bool isExpectedRepositoryMetricsLogMessage(const utils::LogMessageView& 
message_view, std::string_view log_level) {
+  if (message_view.log_level != log_level) { return false; }
+  if (message_view.logger_class != 
"org::apache::nifi::minifi::state::LogMetricsPublisher") { return false; }
+  rapidjson::Document document;
+  if (const rapidjson::ParseResult res = 
document.Parse(message_view.payload.data(), message_view.payload.length()); 
!res) { return false; }
+  if (!document.HasMember("LogMetrics")) { return false; }
+  const auto& log_metrics = document["LogMetrics"].GetObject();
+  if (!log_metrics.HasMember("RepositoryMetrics")) { return false; }
+  const auto& repository_metrics = 
log_metrics["RepositoryMetrics"].GetObject();
+  if (!repository_metrics.HasMember("provenancerepository") || 
!repository_metrics.HasMember("flowfilerepository")) { return false; }
+  const rapidjson::Value& provenance_repo_metrics = 
repository_metrics["provenancerepository"];
+  const rapidjson::Value& flow_file_repo_metrics = 
repository_metrics["flowfilerepository"];
+
+  const auto repo_is_okay = [](const rapidjson::Value& repo_metrics) -> bool {
+    return check_exact_metrics_value(repo_metrics, "full", "false")
+        && check_exact_metrics_value(repo_metrics, "running", "false")
+        && check_exact_metrics_value(repo_metrics, "size", "0")
+        && check_exact_metrics_value(repo_metrics, "maxSize", "0")
+        && check_exact_metrics_value(repo_metrics, "entryCount", "0")
+        && check_exact_metrics_value(repo_metrics, "rocksDbTableReadersSize", 
"0")
+        && check_exact_metrics_value(repo_metrics, 
"rocksDbAllMemoryTablesSize", "2048")
+        && repo_metrics.HasMember("rocksDbBlockCacheUsage")
+        && repo_metrics.HasMember("rocksDbBlockCachePinnedUsage");
+  };
+
+  return repo_is_okay(provenance_repo_metrics) && 
repo_is_okay(flow_file_repo_metrics);
 }
 
 TEST_CASE_METHOD(LogPublisherTestFixture, "Verify multiple metric nodes in 
logs", "[LogMetricsPublisher]") {
@@ -96,34 +131,14 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify multiple 
metric nodes in logs"
   configuration_->set(Configure::nifi_metrics_publisher_metrics, 
"RepositoryMetrics,DeviceInfoNode");
   publisher_->initialize(configuration_, response_node_loader_);
   publisher_->loadMetricNodes();
-  using 
org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime;
-  std::string expected_log_1 = R"([info] {
-    "LogMetrics": {)";
-  std::string expected_log_2 = R"("RepositoryMetrics": {
-            "provenancerepository": {
-                "running": "false",
-                "full": "false",
-                "size": "0",
-                "maxSize": "0",
-                "entryCount": "0",
-                "rocksDbTableReadersSize": "0",
-                "rocksDbAllMemoryTablesSize": "2048"
-            },
-            "flowfilerepository": {
-                "running": "false",
-                "full": "false",
-                "size": "0",
-                "maxSize": "0",
-                "entryCount": "0",
-                "rocksDbTableReadersSize": "0",
-                "rocksDbAllMemoryTablesSize": "2048"
-            }
-        })";
-  std::string expected_log_3 = R"("deviceInfo": {
-            "identifier":)";
-  REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log_1));
-  REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log_2));
-  REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log_3));
+  REQUIRE(utils::verifyEventHappenedInPollTime(
+      5s,
+      [] {
+        const auto logs = LogTestController::getInstance().getLogs();
+        const auto message_views = utils::extractLogMessageViews(logs);
+        return ranges::any_of(message_views, [](const auto& msg_view) { return 
isExpectedRepositoryMetricsLogMessage(msg_view, "info"); });
+      },
+      100ms));
 }
 
 TEST_CASE_METHOD(LogPublisherTestFixture, "Verify reloading different 
metrics", "[LogMetricsPublisher]") {
@@ -132,42 +147,25 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify 
reloading different metrics",
   configuration_->set(Configure::nifi_metrics_publisher_metrics, 
"RepositoryMetrics");
   publisher_->initialize(configuration_, response_node_loader_);
   publisher_->loadMetricNodes();
-  using 
org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime;
-  std::string expected_log = R"([info] {
-    "LogMetrics": {
-        "RepositoryMetrics": {
-            "provenancerepository": {
-                "running": "false",
-                "full": "false",
-                "size": "0",
-                "maxSize": "0",
-                "entryCount": "0",
-                "rocksDbTableReadersSize": "0",
-                "rocksDbAllMemoryTablesSize": "2048"
-            },
-            "flowfilerepository": {
-                "running": "false",
-                "full": "false",
-                "size": "0",
-                "maxSize": "0",
-                "entryCount": "0",
-                "rocksDbTableReadersSize": "0",
-                "rocksDbAllMemoryTablesSize": "2048"
-            }
-        }
-    }
-})";
-  REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log));
+
+  REQUIRE(utils::verifyEventHappenedInPollTime(
+      5s,
+      [] {
+        const auto logs = LogTestController::getInstance().getLogs();
+        const auto message_views = utils::extractLogMessageViews(logs);
+        return ranges::any_of(message_views, [](const auto& msg_view) { return 
isExpectedRepositoryMetricsLogMessage(msg_view, "info"); });
+      },
+      100ms));
   publisher_->clearMetricNodes();
   LogTestController::getInstance().reset();
   
LogTestController::getInstance().setTrace<minifi::state::LogMetricsPublisher>();
   configuration_->set(Configure::nifi_metrics_publisher_metrics, 
"DeviceInfoNode");
   publisher_->loadMetricNodes();
-  expected_log = R"([info] {
+  std::string expected_log = R"([info] {
     "LogMetrics": {
         "deviceInfo": {
             "identifier":)";
-  REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log));
+  REQUIRE(utils::verifyLogLinePresenceInPollTime(5s, expected_log));
 }
 
 TEST_CASE_METHOD(LogPublisherTestFixture, "Verify generic and publisher 
specific metric properties", "[LogMetricsPublisher]") {
@@ -185,32 +183,14 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify generic 
and publisher specific
   }
   publisher_->initialize(configuration_, response_node_loader_);
   publisher_->loadMetricNodes();
-  using 
org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime;
-  std::string expected_log = R"([info] {
-    "LogMetrics": {
-        "RepositoryMetrics": {
-            "provenancerepository": {
-                "running": "false",
-                "full": "false",
-                "size": "0",
-                "maxSize": "0",
-                "entryCount": "0",
-                "rocksDbTableReadersSize": "0",
-                "rocksDbAllMemoryTablesSize": "2048"
-            },
-            "flowfilerepository": {
-                "running": "false",
-                "full": "false",
-                "size": "0",
-                "maxSize": "0",
-                "entryCount": "0",
-                "rocksDbTableReadersSize": "0",
-                "rocksDbAllMemoryTablesSize": "2048"
-            }
-        }
-    }
-})";
-  REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log));
+  REQUIRE(utils::verifyEventHappenedInPollTime(
+      5s,
+      [] {
+        const auto logs = LogTestController::getInstance().getLogs();
+        const auto message_views = utils::extractLogMessageViews(logs);
+        return ranges::any_of(message_views, [](const auto& msg_view) { return 
isExpectedRepositoryMetricsLogMessage(msg_view, "info"); });
+      },
+      100ms));
 }
 
 TEST_CASE_METHOD(LogPublisherTestFixture, "Verify changing log level property 
for logging", "[LogMetricsPublisher]") {
@@ -220,32 +200,14 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify 
changing log level property fo
   configuration_->set(Configure::nifi_metrics_publisher_metrics, 
"RepositoryMetrics");
   publisher_->initialize(configuration_, response_node_loader_);
   publisher_->loadMetricNodes();
-  using 
org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime;
-  std::string expected_log = R"([debug] {
-    "LogMetrics": {
-        "RepositoryMetrics": {
-            "provenancerepository": {
-                "running": "false",
-                "full": "false",
-                "size": "0",
-                "maxSize": "0",
-                "entryCount": "0",
-                "rocksDbTableReadersSize": "0",
-                "rocksDbAllMemoryTablesSize": "2048"
-            },
-            "flowfilerepository": {
-                "running": "false",
-                "full": "false",
-                "size": "0",
-                "maxSize": "0",
-                "entryCount": "0",
-                "rocksDbTableReadersSize": "0",
-                "rocksDbAllMemoryTablesSize": "2048"
-            }
-        }
-    }
-})";
-  REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log));
+  REQUIRE(utils::verifyEventHappenedInPollTime(
+      5s,
+      [] {
+        const auto logs = LogTestController::getInstance().getLogs();
+        const auto message_views = utils::extractLogMessageViews(logs);
+        return ranges::any_of(message_views, [](const auto& msg_view) { return 
isExpectedRepositoryMetricsLogMessage(msg_view, "debug"); });
+      },
+      100ms));
 }
 
 }  // namespace org::apache::nifi::minifi::test
diff --git a/libminifi/test/unit/MetricsTests.cpp 
b/libminifi/test/unit/MetricsTests.cpp
index 25840d2e6..4fe02defd 100644
--- a/libminifi/test/unit/MetricsTests.cpp
+++ b/libminifi/test/unit/MetricsTests.cpp
@@ -104,7 +104,7 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") {
 
   SECTION("RocksDB repository") {
     repo = std::make_shared<TestRocksDbRepository>();
-    expected_metric_count = 7;
+    expected_metric_count = 9;
   }
 
 
@@ -125,6 +125,8 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") {
     if (expected_metric_count > 5) {
       checkSerializedValue(resp.children, "rocksDbTableReadersSize", "100");
       checkSerializedValue(resp.children, "rocksDbAllMemoryTablesSize", "200");
+      checkSerializedValue(resp.children, "rocksDbBlockCacheUsage", "85");
+      checkSerializedValue(resp.children, "rocksDbBlockCachePinnedUsage", 
"50");
     }
   }
 
diff --git a/minifi-api/include/minifi-cpp/core/RepositoryMetricsSource.h 
b/minifi-api/include/minifi-cpp/core/RepositoryMetricsSource.h
index ca55e461c..21f2308d1 100644
--- a/minifi-api/include/minifi-cpp/core/RepositoryMetricsSource.h
+++ b/minifi-api/include/minifi-cpp/core/RepositoryMetricsSource.h
@@ -28,6 +28,8 @@ class RepositoryMetricsSource {
   struct RocksDbStats {
     uint64_t table_readers_size{};
     uint64_t all_memory_tables_size{};
+    uint64_t block_cache_usage{};
+    uint64_t block_cache_pinned_usage{};
   };
 
   virtual ~RepositoryMetricsSource() = default;
diff --git a/minifi-api/include/minifi-cpp/properties/Configuration.h 
b/minifi-api/include/minifi-cpp/properties/Configuration.h
index 8ed94061b..44e50ea0f 100644
--- a/minifi-api/include/minifi-cpp/properties/Configuration.h
+++ b/minifi-api/include/minifi-cpp/properties/Configuration.h
@@ -76,6 +76,7 @@ class Configuration : public virtual Properties {
   static constexpr const char 
*nifi_flowfile_repository_rocksdb_read_verify_checksums = 
"nifi.flowfile.repository.rocksdb.read.verify.checksums";
   static constexpr const char 
*nifi_provenance_repository_rocksdb_read_verify_checksums = 
"nifi.provenance.repository.rocksdb.read.verify.checksums";
   static constexpr const char 
*nifi_rocksdb_state_storage_read_verify_checksums = 
"nifi.rocksdb.state.storage.read.verify.checksums";
+  static constexpr const char *nifi_dbcontent_optimize_for_small_db_cache_size 
= "nifi.database.content.repository.optimize.for.small.db.cache.size";
 
   static constexpr const char *nifi_remote_input_secure = 
"nifi.remote.input.secure";
   static constexpr const char *nifi_security_need_ClientAuth = 
"nifi.security.need.ClientAuth";


Reply via email to