This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 4a857ef4a46e5bb1f28b3b8f096e7f0ca006bdc2 Author: Martin Zink <[email protected]> AuthorDate: Wed Mar 25 00:53:26 2026 +0100 MINIFICPP-2696 Add option for optimizationForSmallDB in DatabaseConte… I’ve went ahead and this some testing so we can set a sensible default value for this new configurable value. Based on my testing it seems the processing overhead is negligible, unfortunately the memory gains are also not what one expect (flow specific) I’ve created a bunch of flows and tried it various cache sizes. - Simple GenerateFlowFile → LogAttribute 5MB Text (random) - processing throughput remained the same while achieving 10% lower memory footprint on avarage, but the memory usage was not consistent - TailFile(30k lines) → MergeFile → LogAttribute, with GetFile → LogAttribute (to increase memory usage) - Processing time of the tailfile remained within run-to-run variance with 20-30% lower memory footprint I’ve also noticed that compaction time and periodic https://man7.org/linux/man-pages/man3/malloc_trim.3.html makes memory footprint even smaller (much more noticably in some cases) Methodology Processing time was measured via log messages, memory footprint was measured via prometheus AgentStatus metrics Baseline was setting the new variable to an empty string (which basically mimics the current main behaviour) **I couldn’t measure performance losses but only tried with a small set of relatively simple flows, since this PR now changes the default behavior its important that we emphasise this change and the method to disable this in the release logs.** Closes #2081 Signed-off-by: Marton Szasz <[email protected]> --- CONFIGURE.md | 14 ++ conf/minifi.properties.in | 1 + .../rocksdb-repos/DatabaseContentRepository.cpp | 40 +++- extensions/rocksdb-repos/database/OpenRocksDb.cpp | 26 +-- extensions/rocksdb-repos/database/OpenRocksDb.h | 2 + extensions/rocksdb-repos/database/RocksDbUtils.h | 12 ++ .../tests/DBContentRepositoryTests.cpp | 41 ++++ libminifi/src/Configuration.cpp | 1 + .../state/nodes/RepositoryMetricsSourceStore.cpp | 6 + libminifi/test/libtest/unit/ProvenanceTestHelper.h | 4 +- libminifi/test/libtest/unit/TestUtils.cpp | 39 ++++ libminifi/test/libtest/unit/TestUtils.h | 18 +- libminifi/test/unit/LogMetricsPublisherTests.cpp | 206 +++++++++------------ libminifi/test/unit/MetricsTests.cpp | 4 +- .../minifi-cpp/core/RepositoryMetricsSource.h | 2 + .../include/minifi-cpp/properties/Configuration.h | 1 + 16 files changed, 275 insertions(+), 142 deletions(-) diff --git a/CONFIGURE.md b/CONFIGURE.md index 62e8a8768..40cb44b7b 100644 --- a/CONFIGURE.md +++ b/CONFIGURE.md @@ -41,6 +41,7 @@ - [Configuring Repositories](#configuring-repositories) - [Configuring Volatile Repositories](#configuring-volatile-repositories) - [Configuring Repository storage locations](#configuring-repository-storage-locations) + - [Configuring cache size for rocksdb content repository](#configuring-cache-size-for-rocksdb-content-repository) - [Configuring compression for rocksdb database](#configuring-compression-for-rocksdb-database) - [Configuring compaction for rocksdb database](#configuring-compaction-for-rocksdb-database) - [Configuring synchronous or asynchronous writes for RocksDB content repository](#configuring-synchronous-or-asynchronous-writes-for-rocksdb-content-repository) @@ -674,6 +675,19 @@ In a Filesystem Hierarchy Standard (FHS) installation (from an RPM package), the nifi.flowfile.repository.directory.default=/var/lib/nifi-minifi-cpp/flowfile_repository nifi.database.content.repository.directory.default=/var/lib/nifi-minifi-cpp/content_repository +### Configuring cache size for rocksdb content repository + +The RocksDB content repository uses a cache to limit memory usage. The cache size can be configured using the following property. +This should limit the memory usage but may cause minimal processing overhead. + + # in minifi.properties + nifi.database.content.repository.optimize.for.small.db.cache.size=8 MB + +You can disable this cache by setting it to an empty value. + + # in minifi.properties + nifi.database.content.repository.optimize.for.small.db.cache.size= + ### Configuring compression for rocksdb database diff --git a/conf/minifi.properties.in b/conf/minifi.properties.in index 636632e8c..afcb7f9d9 100644 --- a/conf/minifi.properties.in +++ b/conf/minifi.properties.in @@ -52,6 +52,7 @@ nifi.content.repository.class.name=DatabaseContentRepository ## Relates to the internal workings of the rocksdb backend # nifi.flowfile.repository.rocksdb.compaction.period=2 min # nifi.database.content.repository.rocksdb.compaction.period=2 min +# nifi.database.content.repository.optimize.for.small.db.cache.size=8 MB # setting this value to "0" enables synchronous deletion # nifi.database.content.repository.purge.period = 1 sec diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp index ddb7b0dc0..80b3a832e 100644 --- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp +++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp @@ -58,21 +58,55 @@ bool DatabaseContentRepository::initialize(const std::shared_ptr<minifi::Configu setCompactionPeriod(configuration); - auto set_db_opts = [encrypted_env] (minifi::internal::Writable<rocksdb::DBOptions>& db_opts) { + const auto cache_size = configuration->get(Configure::nifi_dbcontent_optimize_for_small_db_cache_size) + | utils::orElse([] { return std::make_optional<std::string>("8 MB"); }) + | utils::andThen([](const auto& cache_size_str) -> std::optional<uint64_t> { + if (cache_size_str.empty()) { + return std::nullopt; + } + return parsing::parseDataSize(cache_size_str) | utils::orThrow(fmt::format("{} has invalid format {}", Configure::nifi_dbcontent_optimize_for_small_db_cache_size, cache_size_str)); + }); + + std::shared_ptr<rocksdb::Cache> cache = nullptr; + std::shared_ptr<rocksdb::WriteBufferManager> wbm = nullptr; + if (cache_size) { + cache = rocksdb::NewLRUCache(*cache_size); + wbm = std::make_shared<rocksdb::WriteBufferManager>(0, cache); + logger_->log_trace("Using {} sized cache for DatabaseContentRepository", *cache_size); + } else { + logger_->log_trace("Cache limitation disabled for DatabaseContentRepository"); + } + + auto set_db_opts = [encrypted_env, &cache, &wbm] (minifi::internal::Writable<rocksdb::DBOptions>& db_opts) { minifi::internal::setCommonRocksDbOptions(db_opts); if (encrypted_env) { db_opts.set(&rocksdb::DBOptions::env, encrypted_env.get(), EncryptionEq{}); } else { db_opts.set(&rocksdb::DBOptions::env, rocksdb::Env::Default()); } + db_opts.optimizeForSmallDb(cache, wbm); }; - auto set_cf_opts = [&configuration] (rocksdb::ColumnFamilyOptions& cf_opts) { + auto set_cf_opts = [&configuration, &cache] (rocksdb::ColumnFamilyOptions& cf_opts) { cf_opts.OptimizeForPointLookup(4); cf_opts.merge_operator = std::make_shared<StringAppender>(); cf_opts.max_successive_merges = 0; - if (auto compression_type = minifi::internal::readConfiguredCompressionType(configuration, Configure::nifi_content_repository_rocksdb_compression)) { + cf_opts.max_write_buffer_number = 2; + cf_opts.write_buffer_size = 4_MB; + if (const auto compression_type = internal::readConfiguredCompressionType(configuration, Configure::nifi_content_repository_rocksdb_compression)) { cf_opts.compression = *compression_type; } + if (cache) { + rocksdb::BlockBasedTableOptions table_options; + table_options.block_cache = cache; + + table_options.cache_index_and_filter_blocks = true; + table_options.cache_index_and_filter_blocks_with_high_priority = true; + + table_options.pin_l0_filter_and_index_blocks_in_cache = false; + table_options.pin_top_level_index_and_filter = false; + + cf_opts.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options)); + } }; db_ = minifi::internal::RocksDatabase::create(set_db_opts, set_cf_opts, directory_, minifi::internal::getRocksDbOptionsToOverride(configuration, Configure::nifi_content_repository_rocksdb_options)); diff --git a/extensions/rocksdb-repos/database/OpenRocksDb.cpp b/extensions/rocksdb-repos/database/OpenRocksDb.cpp index c48c4b62c..9989ab3bd 100644 --- a/extensions/rocksdb-repos/database/OpenRocksDb.cpp +++ b/extensions/rocksdb-repos/database/OpenRocksDb.cpp @@ -135,23 +135,23 @@ std::optional<uint64_t> OpenRocksDb::getApproximateSizes() const { return std::nullopt; } -minifi::core::RepositoryMetricsSource::RocksDbStats OpenRocksDb::getStats() { - minifi::core::RepositoryMetricsSource::RocksDbStats stats; - std::string table_readers; - GetProperty("rocksdb.estimate-table-readers-mem", &table_readers); +void OpenRocksDb::fillU64FromProperty(uint64_t& member, std::string_view property_name) { + std::string property_value; + GetProperty(property_name, &property_value); try { - stats.table_readers_size = std::stoull(table_readers); + member = std::stoull(property_value); } catch (const std::exception&) { - logger_->log_warn("Could not retrieve valid 'rocksdb.estimate-table-readers-mem' property value from rocksdb content repository!"); + logger_->log_warn("Could not retrieve valid '{}' property value from rocksdb content repository!", property_name); } +} - std::string all_memtables; - GetProperty("rocksdb.cur-size-all-mem-tables", &all_memtables); - try { - stats.all_memory_tables_size = std::stoull(all_memtables); - } catch (const std::exception&) { - logger_->log_warn("Could not retrieve valid 'rocksdb.cur-size-all-mem-tables' property value from rocksdb content repository!"); - } + +minifi::core::RepositoryMetricsSource::RocksDbStats OpenRocksDb::getStats() { + minifi::core::RepositoryMetricsSource::RocksDbStats stats; + fillU64FromProperty(stats.table_readers_size, "rocksdb.estimate-table-readers-mem"); + fillU64FromProperty(stats.all_memory_tables_size, "rocksdb.cur-size-all-mem-tables"); + fillU64FromProperty(stats.block_cache_usage, "rocksdb.block-cache-usage"); + fillU64FromProperty(stats.block_cache_pinned_usage, "rocksdb.block-cache-pinned-usage"); return stats; } diff --git a/extensions/rocksdb-repos/database/OpenRocksDb.h b/extensions/rocksdb-repos/database/OpenRocksDb.h index b97fe880a..dfd93b28c 100644 --- a/extensions/rocksdb-repos/database/OpenRocksDb.h +++ b/extensions/rocksdb-repos/database/OpenRocksDb.h @@ -81,6 +81,8 @@ class OpenRocksDb { void handleResult(const rocksdb::Status& result); void handleResult(const std::vector<rocksdb::Status>& results); + void fillU64FromProperty(uint64_t& member, std::string_view property_name); + gsl::not_null<RocksDbInstance*> db_; gsl::not_null<std::shared_ptr<rocksdb::DB>> impl_; gsl::not_null<std::shared_ptr<ColumnHandle>> column_; diff --git a/extensions/rocksdb-repos/database/RocksDbUtils.h b/extensions/rocksdb-repos/database/RocksDbUtils.h index 627517577..07eaed95c 100644 --- a/extensions/rocksdb-repos/database/RocksDbUtils.h +++ b/extensions/rocksdb-repos/database/RocksDbUtils.h @@ -49,6 +49,18 @@ class Writable { } } + template <typename Method, typename... Args> + decltype(auto) call(Method method, Args&&... args) { + return std::invoke(method, target_, std::forward<Args>(args)...); + } + + void optimizeForSmallDb(std::shared_ptr<rocksdb::Cache> cache, std::shared_ptr<rocksdb::WriteBufferManager> wbm) { + if (!cache || !wbm) { return; } + target_.OptimizeForSmallDb(&cache); + target_.write_buffer_manager = wbm; + target_.max_open_files = 20; + } + template<typename F> const F& get(F T::* member) { return target_.*member; diff --git a/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp b/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp index 50195748b..835487039 100644 --- a/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp +++ b/extensions/rocksdb-repos/tests/DBContentRepositoryTests.cpp @@ -355,3 +355,44 @@ TEST_CASE("DBContentRepository can clear orphan entries") { REQUIRE(getDbSize(dir) == 0); } + +TEST_CASE("nifi_dbcontent_optimize_for_small_db_cache_size default") { + LogTestController::getInstance().setTrace<core::repository::DatabaseContentRepository>(); + + const auto configuration = std::make_shared<org::apache::nifi::minifi::ConfigureImpl>(); + const auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>(); + REQUIRE(content_repo->initialize(configuration)); + + CHECK(LogTestController::getInstance().contains("Using 8388608 sized cache for DatabaseContentRepository")); +} + +TEST_CASE("nifi_dbcontent_optimize_for_small_db_cache_size override") { + LogTestController::getInstance().setTrace<core::repository::DatabaseContentRepository>(); + + const auto configuration = std::make_shared<org::apache::nifi::minifi::ConfigureImpl>(); + configuration->set("nifi.database.content.repository.optimize.for.small.db.cache.size", "100 MB"); + const auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>(); + REQUIRE(content_repo->initialize(configuration)); + + CHECK(LogTestController::getInstance().contains("Using 104857600 sized cache for DatabaseContentRepository")); +} + +TEST_CASE("nifi_dbcontent_optimize_for_small_db_cache_size disable") { + LogTestController::getInstance().setTrace<core::repository::DatabaseContentRepository>(); + + const auto configuration = std::make_shared<org::apache::nifi::minifi::ConfigureImpl>(); + configuration->set("nifi.database.content.repository.optimize.for.small.db.cache.size", ""); + const auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>(); + REQUIRE(content_repo->initialize(configuration)); + + CHECK(LogTestController::getInstance().contains("Cache limitation disabled for DatabaseContentRepository")); +} + +TEST_CASE("nifi_dbcontent_optimize_for_small_db_cache_size invalid") { + LogTestController::getInstance().setTrace<core::repository::DatabaseContentRepository>(); + + const auto configuration = std::make_shared<org::apache::nifi::minifi::ConfigureImpl>(); + configuration->set("nifi.database.content.repository.optimize.for.small.db.cache.size", "two million bytes"); + const auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>(); + REQUIRE_THROWS(content_repo->initialize(configuration)); +} diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp index 191705fac..17dba361b 100644 --- a/libminifi/src/Configuration.cpp +++ b/libminifi/src/Configuration.cpp @@ -54,6 +54,7 @@ const std::unordered_map<std::string_view, gsl::not_null<const core::PropertyVal {Configuration::nifi_flowfile_repository_rocksdb_read_verify_checksums, gsl::make_not_null(&core::StandardPropertyValidators::BOOLEAN_VALIDATOR)}, {Configuration::nifi_provenance_repository_rocksdb_read_verify_checksums, gsl::make_not_null(&core::StandardPropertyValidators::BOOLEAN_VALIDATOR)}, {Configuration::nifi_rocksdb_state_storage_read_verify_checksums, gsl::make_not_null(&core::StandardPropertyValidators::BOOLEAN_VALIDATOR)}, + {Configuration::nifi_dbcontent_optimize_for_small_db_cache_size, gsl::make_not_null(&core::StandardPropertyValidators::DATA_SIZE_VALIDATOR)}, {Configuration::nifi_dbcontent_repository_purge_period, gsl::make_not_null(&core::StandardPropertyValidators::TIME_PERIOD_VALIDATOR)}, {Configuration::nifi_remote_input_secure, gsl::make_not_null(&core::StandardPropertyValidators::BOOLEAN_VALIDATOR)}, {Configuration::nifi_security_need_ClientAuth, gsl::make_not_null(&core::StandardPropertyValidators::BOOLEAN_VALIDATOR)}, diff --git a/libminifi/src/core/state/nodes/RepositoryMetricsSourceStore.cpp b/libminifi/src/core/state/nodes/RepositoryMetricsSourceStore.cpp index 0d4789f8c..ef414837a 100644 --- a/libminifi/src/core/state/nodes/RepositoryMetricsSourceStore.cpp +++ b/libminifi/src/core/state/nodes/RepositoryMetricsSourceStore.cpp @@ -48,6 +48,8 @@ std::vector<SerializedResponseNode> RepositoryMetricsSourceStore::serialize() co if (auto rocksdb_stats = repo->getRocksDbStats()) { parent.children.push_back({.name = "rocksDbTableReadersSize", .value = rocksdb_stats->table_readers_size}); parent.children.push_back({.name = "rocksDbAllMemoryTablesSize", .value = rocksdb_stats->all_memory_tables_size}); + parent.children.push_back({.name = "rocksDbBlockCacheUsage", .value = rocksdb_stats->block_cache_usage}); + parent.children.push_back({.name = "rocksDbBlockCachePinnedUsage", .value = rocksdb_stats->block_cache_pinned_usage}); } serialized.push_back(parent); @@ -68,6 +70,10 @@ std::vector<PublishedMetric> RepositoryMetricsSourceStore::calculateMetrics() co {{"metric_class", name_}, {"repository_name", repo->getRepositoryName()}}}); metrics.push_back({"rocksdb_all_memory_tables_size_bytes", static_cast<double>(rocksdb_stats->all_memory_tables_size), {{"metric_class", name_}, {"repository_name", repo->getRepositoryName()}}}); + metrics.push_back({"rocksdb_block_cache_usage_bytes", static_cast<double>(rocksdb_stats->block_cache_usage), + {{"metric_class", name_}, {"repository_name", repo->getRepositoryName()}}}); + metrics.push_back({"rocksdb_block_cache_pinned_usage_bytes", static_cast<double>(rocksdb_stats->block_cache_pinned_usage), + {{"metric_class", name_}, {"repository_name", repo->getRepositoryName()}}}); } } return metrics; diff --git a/libminifi/test/libtest/unit/ProvenanceTestHelper.h b/libminifi/test/libtest/unit/ProvenanceTestHelper.h index 2b5b70bca..4da9cea14 100644 --- a/libminifi/test/libtest/unit/ProvenanceTestHelper.h +++ b/libminifi/test/libtest/unit/ProvenanceTestHelper.h @@ -161,7 +161,9 @@ class TestRocksDbRepository : public TestThreadedRepository { std::optional<RocksDbStats> getRocksDbStats() const override { return RocksDbStats { .table_readers_size = 100, - .all_memory_tables_size = 200 + .all_memory_tables_size = 200, + .block_cache_usage = 85, + .block_cache_pinned_usage = 50 }; } }; diff --git a/libminifi/test/libtest/unit/TestUtils.cpp b/libminifi/test/libtest/unit/TestUtils.cpp index c56f44f75..9064ed357 100644 --- a/libminifi/test/libtest/unit/TestUtils.cpp +++ b/libminifi/test/libtest/unit/TestUtils.cpp @@ -260,4 +260,43 @@ std::error_code sendMessagesViaSSL(const std::vector<std::string_view>& contents return {}; } +std::vector<LogMessageView> extractLogMessageViews(const std::string& log_str) { + std::vector<LogMessageView> messages; + const std::regex header_pattern(R"(\[([\d\-\s\:\.]+)\]\s+\[(.*?)\]\s+\[(.*?)\])"); + struct HeaderMarker { + size_t start; + std::string_view timestamp; + std::string_view logger_class; + std::string_view log_level; + size_t end; + }; + + std::vector<HeaderMarker> markers = ranges::subrange<std::sregex_iterator>(std::sregex_iterator(log_str.begin(), log_str.end(), header_pattern), + std::sregex_iterator()) | + ranges::views::transform([=](const std::smatch& m) { + return HeaderMarker{.start = static_cast<size_t>(m.position(0)), + .timestamp = std::string_view{log_str.data() + m.position(1), static_cast<size_t>(m.length(1))}, + .logger_class = std::string_view{log_str.data() + m.position(2), static_cast<size_t>(m.length(2))}, + .log_level = std::string_view{log_str.data() + m.position(3), static_cast<size_t>(m.length(3))}, + .end = static_cast<size_t>(m.position(0) + m.length(0)) + }; + }) | ranges::to<std::vector>(); + + markers.push_back(HeaderMarker{.start = log_str.size(), + .timestamp = {}, + .logger_class = {}, + .log_level = {}, + .end = log_str.size() + }); + + for (auto window: markers | ranges::views::sliding(2)) { + messages.push_back(LogMessageView{.timestamp = window[0].timestamp, + .logger_class = window[0].logger_class, + .log_level = window[0].log_level, + .payload = {log_str.data() + window[0].end, window[1].start - window[0].end}}); + } + + return messages; +} + } // namespace org::apache::nifi::minifi::test::utils diff --git a/libminifi/test/libtest/unit/TestUtils.h b/libminifi/test/libtest/unit/TestUtils.h index 64af75c7c..c65840540 100644 --- a/libminifi/test/libtest/unit/TestUtils.h +++ b/libminifi/test/libtest/unit/TestUtils.h @@ -37,9 +37,8 @@ #include "asio.hpp" #include "asio/ssl.hpp" #include "utils/net/Ssl.h" -#include "range/v3/algorithm/any_of.hpp" #include "core/Processor.h" -#include "core/logging/LoggerFactory.h" +#include <range/v3/all.hpp> #include "./ProcessorUtils.h" using namespace std::literals::chrono_literals; @@ -127,6 +126,12 @@ bool verifyLogLineVariantPresenceInPollTime(const std::chrono::duration<Rep, Per return verifyEventHappenedInPollTime(wait_duration, check); } +template<class Rep, class Period> +bool verifyLogMatchesRegexInPollTime(const std::chrono::duration<Rep, Period>& wait_duration, const std::string& regex) { + auto check = [®ex] { return LogTestController::getInstance().matchesRegex(regex); }; + return verifyEventHappenedInPollTime(wait_duration, check); +} + namespace internal { struct JsonContext { const JsonContext *parent{nullptr}; @@ -234,6 +239,15 @@ inline bool runningAsUnixRoot() { #endif } +struct LogMessageView { + std::string_view timestamp; + std::string_view logger_class; + std::string_view log_level; + std::string_view payload; +}; + +std::vector<LogMessageView> extractLogMessageViews(const std::string& log_str); + } // namespace org::apache::nifi::minifi::test::utils namespace Catch { diff --git a/libminifi/test/unit/LogMetricsPublisherTests.cpp b/libminifi/test/unit/LogMetricsPublisherTests.cpp index abf8d5509..39ccb235b 100644 --- a/libminifi/test/unit/LogMetricsPublisherTests.cpp +++ b/libminifi/test/unit/LogMetricsPublisherTests.cpp @@ -18,11 +18,11 @@ #include <memory> #include <thread> -#include "unit/TestBase.h" -#include "unit/Catch.h" +#include "core/RepositoryFactory.h" #include "core/state/LogMetricsPublisher.h" #include "core/state/nodes/ResponseNodeLoader.h" -#include "core/RepositoryFactory.h" +#include "unit/Catch.h" +#include "unit/TestBase.h" #include "unit/TestUtils.h" #include "utils/file/FileUtils.h" @@ -33,12 +33,12 @@ namespace org::apache::nifi::minifi::test { class LogPublisherTestFixture { public: LogPublisherTestFixture() - : configuration_(std::make_shared<ConfigureImpl>()), - provenance_repo_(core::createRepository("provenancerepository", "provenancerepository")), - flow_file_repo_(core::createRepository("flowfilerepository", "flowfilerepository")), - response_node_loader_(std::make_shared<state::response::ResponseNodeLoaderImpl>(configuration_, - std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{provenance_repo_, flow_file_repo_}, nullptr)), - publisher_(std::make_unique<minifi::state::LogMetricsPublisher>("LogMetricsPublisher")) { + : configuration_(std::make_shared<ConfigureImpl>()), + provenance_repo_(core::createRepository("provenancerepository", "provenancerepository")), + flow_file_repo_(core::createRepository("flowfilerepository", "flowfilerepository")), + response_node_loader_(std::make_shared<state::response::ResponseNodeLoaderImpl>(configuration_, + std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{provenance_repo_, flow_file_repo_}, nullptr)), + publisher_(std::make_unique<minifi::state::LogMetricsPublisher>("LogMetricsPublisher")) { provenance_repo_->initialize(configuration_); flow_file_repo_->initialize(configuration_); } @@ -67,13 +67,13 @@ class LogPublisherTestFixture { TEST_CASE_METHOD(LogPublisherTestFixture, "Logging interval property is mandatory", "[LogMetricsPublisher]") { LogTestController::getInstance().setTrace<minifi::state::LogMetricsPublisher>(); SECTION("No logging interval is set") { - REQUIRE_THROWS_WITH(publisher_->initialize(configuration_, response_node_loader_), "General Operation: Metrics logging interval not configured for log metrics publisher!"); + REQUIRE_THROWS_WITH(publisher_->initialize(configuration_, response_node_loader_), + "General Operation: Metrics logging interval not configured for log metrics publisher!"); } SECTION("Logging interval is set to 2 seconds") { configuration_->set(minifi::Configuration::nifi_metrics_publisher_log_metrics_logging_interval, "2s"); - using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime; publisher_->initialize(configuration_, response_node_loader_); - REQUIRE(verifyLogLinePresenceInPollTime(5s, "Metric logging interval is set to 2000ms")); + REQUIRE(utils::verifyLogLinePresenceInPollTime(5s, "Metric logging interval is set to 2000ms")); } } @@ -86,8 +86,43 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify empty metrics if no valid metr } publisher_->initialize(configuration_, response_node_loader_); publisher_->loadMetricNodes(); - using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime; - REQUIRE(verifyLogLinePresenceInPollTime(5s, "LogMetricsPublisher is configured without any valid metrics!")); + REQUIRE(utils::verifyLogLinePresenceInPollTime(5s, "LogMetricsPublisher is configured without any valid metrics!")); +} + +bool check_exact_metrics_value(const rapidjson::Value& repo_metrics, const std::string_view key, const std::string_view expected_value) { + const auto key_ref = rapidjson::StringRef(key.data(), key.size()); + const auto member_it = repo_metrics.FindMember(key_ref); + if (member_it == repo_metrics.MemberEnd()) { return false; } + const auto actual_value = std::string_view{member_it->value.GetString(), member_it->value.GetStringLength()}; + return actual_value == expected_value; +} + +bool isExpectedRepositoryMetricsLogMessage(const utils::LogMessageView& message_view, std::string_view log_level) { + if (message_view.log_level != log_level) { return false; } + if (message_view.logger_class != "org::apache::nifi::minifi::state::LogMetricsPublisher") { return false; } + rapidjson::Document document; + if (const rapidjson::ParseResult res = document.Parse(message_view.payload.data(), message_view.payload.length()); !res) { return false; } + if (!document.HasMember("LogMetrics")) { return false; } + const auto& log_metrics = document["LogMetrics"].GetObject(); + if (!log_metrics.HasMember("RepositoryMetrics")) { return false; } + const auto& repository_metrics = log_metrics["RepositoryMetrics"].GetObject(); + if (!repository_metrics.HasMember("provenancerepository") || !repository_metrics.HasMember("flowfilerepository")) { return false; } + const rapidjson::Value& provenance_repo_metrics = repository_metrics["provenancerepository"]; + const rapidjson::Value& flow_file_repo_metrics = repository_metrics["flowfilerepository"]; + + const auto repo_is_okay = [](const rapidjson::Value& repo_metrics) -> bool { + return check_exact_metrics_value(repo_metrics, "full", "false") + && check_exact_metrics_value(repo_metrics, "running", "false") + && check_exact_metrics_value(repo_metrics, "size", "0") + && check_exact_metrics_value(repo_metrics, "maxSize", "0") + && check_exact_metrics_value(repo_metrics, "entryCount", "0") + && check_exact_metrics_value(repo_metrics, "rocksDbTableReadersSize", "0") + && check_exact_metrics_value(repo_metrics, "rocksDbAllMemoryTablesSize", "2048") + && repo_metrics.HasMember("rocksDbBlockCacheUsage") + && repo_metrics.HasMember("rocksDbBlockCachePinnedUsage"); + }; + + return repo_is_okay(provenance_repo_metrics) && repo_is_okay(flow_file_repo_metrics); } TEST_CASE_METHOD(LogPublisherTestFixture, "Verify multiple metric nodes in logs", "[LogMetricsPublisher]") { @@ -96,34 +131,14 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify multiple metric nodes in logs" configuration_->set(Configure::nifi_metrics_publisher_metrics, "RepositoryMetrics,DeviceInfoNode"); publisher_->initialize(configuration_, response_node_loader_); publisher_->loadMetricNodes(); - using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime; - std::string expected_log_1 = R"([info] { - "LogMetrics": {)"; - std::string expected_log_2 = R"("RepositoryMetrics": { - "provenancerepository": { - "running": "false", - "full": "false", - "size": "0", - "maxSize": "0", - "entryCount": "0", - "rocksDbTableReadersSize": "0", - "rocksDbAllMemoryTablesSize": "2048" - }, - "flowfilerepository": { - "running": "false", - "full": "false", - "size": "0", - "maxSize": "0", - "entryCount": "0", - "rocksDbTableReadersSize": "0", - "rocksDbAllMemoryTablesSize": "2048" - } - })"; - std::string expected_log_3 = R"("deviceInfo": { - "identifier":)"; - REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log_1)); - REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log_2)); - REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log_3)); + REQUIRE(utils::verifyEventHappenedInPollTime( + 5s, + [] { + const auto logs = LogTestController::getInstance().getLogs(); + const auto message_views = utils::extractLogMessageViews(logs); + return ranges::any_of(message_views, [](const auto& msg_view) { return isExpectedRepositoryMetricsLogMessage(msg_view, "info"); }); + }, + 100ms)); } TEST_CASE_METHOD(LogPublisherTestFixture, "Verify reloading different metrics", "[LogMetricsPublisher]") { @@ -132,42 +147,25 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify reloading different metrics", configuration_->set(Configure::nifi_metrics_publisher_metrics, "RepositoryMetrics"); publisher_->initialize(configuration_, response_node_loader_); publisher_->loadMetricNodes(); - using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime; - std::string expected_log = R"([info] { - "LogMetrics": { - "RepositoryMetrics": { - "provenancerepository": { - "running": "false", - "full": "false", - "size": "0", - "maxSize": "0", - "entryCount": "0", - "rocksDbTableReadersSize": "0", - "rocksDbAllMemoryTablesSize": "2048" - }, - "flowfilerepository": { - "running": "false", - "full": "false", - "size": "0", - "maxSize": "0", - "entryCount": "0", - "rocksDbTableReadersSize": "0", - "rocksDbAllMemoryTablesSize": "2048" - } - } - } -})"; - REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log)); + + REQUIRE(utils::verifyEventHappenedInPollTime( + 5s, + [] { + const auto logs = LogTestController::getInstance().getLogs(); + const auto message_views = utils::extractLogMessageViews(logs); + return ranges::any_of(message_views, [](const auto& msg_view) { return isExpectedRepositoryMetricsLogMessage(msg_view, "info"); }); + }, + 100ms)); publisher_->clearMetricNodes(); LogTestController::getInstance().reset(); LogTestController::getInstance().setTrace<minifi::state::LogMetricsPublisher>(); configuration_->set(Configure::nifi_metrics_publisher_metrics, "DeviceInfoNode"); publisher_->loadMetricNodes(); - expected_log = R"([info] { + std::string expected_log = R"([info] { "LogMetrics": { "deviceInfo": { "identifier":)"; - REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log)); + REQUIRE(utils::verifyLogLinePresenceInPollTime(5s, expected_log)); } TEST_CASE_METHOD(LogPublisherTestFixture, "Verify generic and publisher specific metric properties", "[LogMetricsPublisher]") { @@ -185,32 +183,14 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify generic and publisher specific } publisher_->initialize(configuration_, response_node_loader_); publisher_->loadMetricNodes(); - using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime; - std::string expected_log = R"([info] { - "LogMetrics": { - "RepositoryMetrics": { - "provenancerepository": { - "running": "false", - "full": "false", - "size": "0", - "maxSize": "0", - "entryCount": "0", - "rocksDbTableReadersSize": "0", - "rocksDbAllMemoryTablesSize": "2048" - }, - "flowfilerepository": { - "running": "false", - "full": "false", - "size": "0", - "maxSize": "0", - "entryCount": "0", - "rocksDbTableReadersSize": "0", - "rocksDbAllMemoryTablesSize": "2048" - } - } - } -})"; - REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log)); + REQUIRE(utils::verifyEventHappenedInPollTime( + 5s, + [] { + const auto logs = LogTestController::getInstance().getLogs(); + const auto message_views = utils::extractLogMessageViews(logs); + return ranges::any_of(message_views, [](const auto& msg_view) { return isExpectedRepositoryMetricsLogMessage(msg_view, "info"); }); + }, + 100ms)); } TEST_CASE_METHOD(LogPublisherTestFixture, "Verify changing log level property for logging", "[LogMetricsPublisher]") { @@ -220,32 +200,14 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify changing log level property fo configuration_->set(Configure::nifi_metrics_publisher_metrics, "RepositoryMetrics"); publisher_->initialize(configuration_, response_node_loader_); publisher_->loadMetricNodes(); - using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime; - std::string expected_log = R"([debug] { - "LogMetrics": { - "RepositoryMetrics": { - "provenancerepository": { - "running": "false", - "full": "false", - "size": "0", - "maxSize": "0", - "entryCount": "0", - "rocksDbTableReadersSize": "0", - "rocksDbAllMemoryTablesSize": "2048" - }, - "flowfilerepository": { - "running": "false", - "full": "false", - "size": "0", - "maxSize": "0", - "entryCount": "0", - "rocksDbTableReadersSize": "0", - "rocksDbAllMemoryTablesSize": "2048" - } - } - } -})"; - REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log)); + REQUIRE(utils::verifyEventHappenedInPollTime( + 5s, + [] { + const auto logs = LogTestController::getInstance().getLogs(); + const auto message_views = utils::extractLogMessageViews(logs); + return ranges::any_of(message_views, [](const auto& msg_view) { return isExpectedRepositoryMetricsLogMessage(msg_view, "debug"); }); + }, + 100ms)); } } // namespace org::apache::nifi::minifi::test diff --git a/libminifi/test/unit/MetricsTests.cpp b/libminifi/test/unit/MetricsTests.cpp index 25840d2e6..4fe02defd 100644 --- a/libminifi/test/unit/MetricsTests.cpp +++ b/libminifi/test/unit/MetricsTests.cpp @@ -104,7 +104,7 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") { SECTION("RocksDB repository") { repo = std::make_shared<TestRocksDbRepository>(); - expected_metric_count = 7; + expected_metric_count = 9; } @@ -125,6 +125,8 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") { if (expected_metric_count > 5) { checkSerializedValue(resp.children, "rocksDbTableReadersSize", "100"); checkSerializedValue(resp.children, "rocksDbAllMemoryTablesSize", "200"); + checkSerializedValue(resp.children, "rocksDbBlockCacheUsage", "85"); + checkSerializedValue(resp.children, "rocksDbBlockCachePinnedUsage", "50"); } } diff --git a/minifi-api/include/minifi-cpp/core/RepositoryMetricsSource.h b/minifi-api/include/minifi-cpp/core/RepositoryMetricsSource.h index ca55e461c..21f2308d1 100644 --- a/minifi-api/include/minifi-cpp/core/RepositoryMetricsSource.h +++ b/minifi-api/include/minifi-cpp/core/RepositoryMetricsSource.h @@ -28,6 +28,8 @@ class RepositoryMetricsSource { struct RocksDbStats { uint64_t table_readers_size{}; uint64_t all_memory_tables_size{}; + uint64_t block_cache_usage{}; + uint64_t block_cache_pinned_usage{}; }; virtual ~RepositoryMetricsSource() = default; diff --git a/minifi-api/include/minifi-cpp/properties/Configuration.h b/minifi-api/include/minifi-cpp/properties/Configuration.h index 8ed94061b..44e50ea0f 100644 --- a/minifi-api/include/minifi-cpp/properties/Configuration.h +++ b/minifi-api/include/minifi-cpp/properties/Configuration.h @@ -76,6 +76,7 @@ class Configuration : public virtual Properties { static constexpr const char *nifi_flowfile_repository_rocksdb_read_verify_checksums = "nifi.flowfile.repository.rocksdb.read.verify.checksums"; static constexpr const char *nifi_provenance_repository_rocksdb_read_verify_checksums = "nifi.provenance.repository.rocksdb.read.verify.checksums"; static constexpr const char *nifi_rocksdb_state_storage_read_verify_checksums = "nifi.rocksdb.state.storage.read.verify.checksums"; + static constexpr const char *nifi_dbcontent_optimize_for_small_db_cache_size = "nifi.database.content.repository.optimize.for.small.db.cache.size"; static constexpr const char *nifi_remote_input_secure = "nifi.remote.input.secure"; static constexpr const char *nifi_security_need_ClientAuth = "nifi.security.need.ClientAuth";
