This is an automated email from the ASF dual-hosted git repository.
fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 91b080b4e MINIFICPP-2082 Move RocksDB stats to RepositoryMetrics
91b080b4e is described below
commit 91b080b4e0e3484b6169f010e692b09b9da64658
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Wed Mar 22 12:43:04 2023 +0100
MINIFICPP-2082 Move RocksDB stats to RepositoryMetrics
Signed-off-by: Ferenc Gerlits <[email protected]>
This closes #1540
---
METRICS.md | 40 +++++++-----
.../cluster/checkers/PrometheusChecker.py | 9 ++-
.../rocksdb-repos/DatabaseContentRepository.cpp | 9 +++
.../rocksdb-repos/DatabaseContentRepository.h | 1 +
extensions/rocksdb-repos/FlowFileRepository.cpp | 6 --
extensions/rocksdb-repos/ProvenanceRepository.cpp | 13 ----
extensions/rocksdb-repos/ProvenanceRepository.h | 2 +-
extensions/rocksdb-repos/RocksDbRepository.cpp | 15 +----
extensions/rocksdb-repos/RocksDbRepository.h | 2 +-
extensions/rocksdb-repos/database/OpenRocksDb.cpp | 21 ++++++
extensions/rocksdb-repos/database/OpenRocksDb.h | 4 ++
libminifi/include/core/RepositoryMetricsSource.h | 10 +++
.../include/core/state/nodes/AgentInformation.h | 73 ++++++---------------
.../include/core/state/nodes/RepositoryMetrics.h | 61 ++++-------------
.../nodes/RepositoryMetricsSourceStore.h} | 39 ++++++-----
.../state/nodes/RepositoryMetricsSourceStore.cpp | 76 ++++++++++++++++++++++
libminifi/test/rocksdb-tests/RepoTests.cpp | 16 +++++
libminifi/test/unit/LogMetricsPublisherTests.cpp | 34 +++++++---
libminifi/test/unit/MetricsTests.cpp | 32 +++++++--
libminifi/test/unit/ProvenanceTestHelper.h | 10 +++
20 files changed, 284 insertions(+), 189 deletions(-)
diff --git a/METRICS.md b/METRICS.md
index f384fe570..ee73a0018 100644
--- a/METRICS.md
+++ b/METRICS.md
@@ -143,13 +143,15 @@ QueueMetrics is a system level metric that reports queue
metrics for every conne
RepositoryMetrics is a system level metric that reports metrics for the
registered repositories (by default flowfile, content, and provenance
repositories)
-| Metric name | Labels | Description
|
-|---------------------------|-----------------|-------------------------------------------------|
-| is_running | repository_name | Is the repository running (1
or 0) |
-| is_full | repository_name | Is the repository full (1 or
0) |
-| repository_size_bytes | repository_name | Current size of the repository
|
-| max_repository_size_bytes | repository_name | Maximum size of the repository
(0 if unlimited) |
-| repository_entry_count | repository_name | Current number of entries in
the repository |
+| Metric name | Labels | Description
|
+|--------------------------------------|-----------------|------------------------------------------------------------------------------------------------------------------|
+| is_running | repository_name | Is the repository
running (1 or 0)
|
+| is_full | repository_name | Is the repository
full (1 or 0)
|
+| repository_size_bytes | repository_name | Current size of the
repository
|
+| max_repository_size_bytes | repository_name | Maximum size of the
repository (0 if unlimited)
|
+| repository_entry_count | repository_name | Current number of
entries in the repository
|
+| rocksdb_table_readers_size_bytes | repository_name | RocksDB's estimated
memory used for reading SST tables (only present if repository uses RocksDB)
|
+| rocksdb_all_memory_tables_size_bytes | repository_name | RocksDB's
approximate size of active and unflushed immutable memtables (only present if
repository uses RocksDB) |
| Label | Description
|
|--------------------------|---------------------------------------------------------------------------------------------------------------------------------------|
@@ -188,17 +190,19 @@ FlowInformation is a system level metric that reports
component and queue relate
AgentStatus is a system level metric that defines current agent status
including repository, component and resource usage information.
-| Metric name | Labels | Description
|
-|---------------------------|--------------------------------|------------------------------------------------------------------------------------------------------------|
-| is_running | repository_name | Is the
repository running (1 or 0)
|
-| is_full | repository_name | Is the
repository full (1 or 0)
|
-| repository_size_bytes | repository_name | Current size of
the repository
|
-| max_repository_size_bytes | repository_name | Maximum size of
the repository (0 if unlimited)
|
-| repository_entry_count | repository_name | Current number
of entries in the repository
|
-| uptime_milliseconds | - | Agent uptime in
milliseconds
|
-| is_running | component_uuid, component_name | Check if the
component is running (1 or 0)
|
-| agent_memory_usage_bytes | - | Memory used by
the agent process in bytes
|
-| agent_cpu_utilization | - | CPU utilization
of the agent process (between 0 and 1). In case of a query error the returned
value is -1. |
+| Metric name | Labels |
Description
|
+|--------------------------------------|--------------------------------|------------------------------------------------------------------------------------------------------------------|
+| is_running | repository_name | Is
the repository running (1 or 0)
|
+| is_full | repository_name | Is
the repository full (1 or 0)
|
+| repository_size_bytes | repository_name |
Current size of the repository
|
+| max_repository_size_bytes | repository_name |
Maximum size of the repository (0 if unlimited)
|
+| repository_entry_count | repository_name |
Current number of entries in the repository
|
+| rocksdb_table_readers_size_bytes | repository_name |
RocksDB's estimated memory used for reading SST tables (only present if
repository uses RocksDB) |
+| rocksdb_all_memory_tables_size_bytes | repository_name |
RocksDB's approximate size of active and unflushed immutable memtables (only
present if repository uses RocksDB) |
+| uptime_milliseconds | - |
Agent uptime in milliseconds
|
+| is_running | component_uuid, component_name |
Check if the component is running (1 or 0)
|
+| agent_memory_usage_bytes | - |
Memory used by the agent process in bytes
|
+| agent_cpu_utilization | - | CPU
utilization of the agent process (between 0 and 1). In case of a query error
the returned value is -1. |
| Label | Description |
|-----------------|----------------------------------------------------------|
diff --git a/docker/test/integration/cluster/checkers/PrometheusChecker.py
b/docker/test/integration/cluster/checkers/PrometheusChecker.py
index bd7daff8b..2c1f1140c 100644
--- a/docker/test/integration/cluster/checkers/PrometheusChecker.py
+++ b/docker/test/integration/cluster/checkers/PrometheusChecker.py
@@ -48,8 +48,10 @@ class PrometheusChecker:
def verify_repository_metrics(self):
label_list = [{'repository_name': 'provenance'}, {'repository_name':
'flowfile'}, {'repository_name': 'content'}]
+ # Only flowfile and content repositories are using rocksdb by default,
so rocksdb specific metrics are only present there
return all((self.verify_metrics_exist(['minifi_is_running',
'minifi_is_full', 'minifi_repository_size_bytes',
'minifi_max_repository_size_bytes', 'minifi_repository_entry_count'],
'RepositoryMetrics', labels) for labels in label_list)) and \
-
all((self.verify_metric_larger_than_zero('minifi_repository_size_bytes',
'RepositoryMetrics', labels) for labels in label_list[1:3]))
+
all((self.verify_metric_larger_than_zero('minifi_repository_size_bytes',
'RepositoryMetrics', labels) for labels in label_list[1:3])) and \
+
all((self.verify_metrics_exist(['minifi_rocksdb_table_readers_size_bytes',
'minifi_rocksdb_all_memory_tables_size_bytes'], 'RepositoryMetrics', labels)
for labels in label_list[1:3]))
def verify_queue_metrics(self):
return self.verify_metrics_exist(['minifi_queue_data_size',
'minifi_queue_data_size_max', 'minifi_queue_size', 'minifi_queue_size_max'],
'QueueMetrics')
@@ -74,12 +76,15 @@ class PrometheusChecker:
def verify_agent_status_metrics(self):
label_list = [{'repository_name': 'flowfile'}, {'repository_name':
'content'}]
+ # Only flowfile and content repositories are using rocksdb by default,
so rocksdb specific metrics are only present there
for labels in label_list:
if not (self.verify_metric_exists('minifi_is_running',
'AgentStatus', labels)
and self.verify_metric_exists('minifi_is_full',
'AgentStatus', labels)
and
self.verify_metric_exists('minifi_max_repository_size_bytes', 'AgentStatus',
labels)
and
self.verify_metric_larger_than_zero('minifi_repository_size_bytes',
'AgentStatus', labels)
- and
self.verify_metric_exists('minifi_repository_entry_count', 'AgentStatus',
labels)):
+ and
self.verify_metric_exists('minifi_repository_entry_count', 'AgentStatus',
labels)
+ and
self.verify_metric_exists('minifi_rocksdb_table_readers_size_bytes',
'AgentStatus', labels)
+ and
self.verify_metric_exists('minifi_rocksdb_all_memory_tables_size_bytes',
'AgentStatus', labels)):
return False
# provenance repository is NoOpRepository by default which has zero
size
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp
b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
index 56c0d2c83..6994a482a 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
@@ -286,6 +286,15 @@ uint64_t
DatabaseContentRepository::getRepositoryEntryCount() const {
})).value_or(0);
}
+std::optional<RepositoryMetricsSource::RocksDbStats>
DatabaseContentRepository::getRocksDbStats() const {
+ auto opendb = db_->open();
+ if (!opendb) {
+ return RocksDbStats{};
+ }
+
+ return opendb->getStats();
+}
+
REGISTER_RESOURCE_AS(DatabaseContentRepository, InternalResource,
("DatabaseContentRepository", "databasecontentrepository"));
} // namespace org::apache::nifi::minifi::core::repository
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h
b/extensions/rocksdb-repos/DatabaseContentRepository.h
index e084a6515..6982427df 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.h
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.h
@@ -76,6 +76,7 @@ class DatabaseContentRepository : public
core::ContentRepository {
uint64_t getRepositorySize() const override;
uint64_t getRepositoryEntryCount() const override;
+ std::optional<RepositoryMetricsSource::RocksDbStats> getRocksDbStats() const
override;
protected:
bool removeKey(const std::string& content_path) override;
diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp
b/extensions/rocksdb-repos/FlowFileRepository.cpp
index b38a7fdce..7144dbc63 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.cpp
+++ b/extensions/rocksdb-repos/FlowFileRepository.cpp
@@ -112,15 +112,9 @@ void
FlowFileRepository::deserializeFlowFilesWithNoContentClaim(minifi::internal
}
void FlowFileRepository::run() {
- auto last = std::chrono::steady_clock::now();
while (isRunning()) {
std::this_thread::sleep_for(purge_period_);
flush();
- auto now = std::chrono::steady_clock::now();
- if ((now-last) > std::chrono::seconds(30)) {
- printStats();
- last = now;
- }
}
flush();
}
diff --git a/extensions/rocksdb-repos/ProvenanceRepository.cpp
b/extensions/rocksdb-repos/ProvenanceRepository.cpp
index 15073835e..d46b3d048 100644
--- a/extensions/rocksdb-repos/ProvenanceRepository.cpp
+++ b/extensions/rocksdb-repos/ProvenanceRepository.cpp
@@ -23,19 +23,6 @@
namespace org::apache::nifi::minifi::provenance {
-void ProvenanceRepository::run() {
- size_t count = 0;
- while (isRunning()) {
- std::this_thread::sleep_for(std::chrono::seconds(1));
- count++;
- // Hack, to be removed in scope of
https://issues.apache.org/jira/browse/MINIFICPP-1145
- count = count % 30;
- if (count == 0) {
- printStats();
- }
- }
-}
-
bool ProvenanceRepository::initialize(const
std::shared_ptr<org::apache::nifi::minifi::Configure> &config) {
std::string value;
if (config->get(Configure::nifi_provenance_repository_directory_default,
value) && !value.empty()) {
diff --git a/extensions/rocksdb-repos/ProvenanceRepository.h
b/extensions/rocksdb-repos/ProvenanceRepository.h
index ca0b6a8ad..58fc7fa64 100644
--- a/extensions/rocksdb-repos/ProvenanceRepository.h
+++ b/extensions/rocksdb-repos/ProvenanceRepository.h
@@ -83,7 +83,7 @@ class ProvenanceRepository : public
core::repository::RocksDbRepository {
private:
// Run function for the thread
- void run() override;
+ void run() override {};
};
} // namespace org::apache::nifi::minifi::provenance
diff --git a/extensions/rocksdb-repos/RocksDbRepository.cpp
b/extensions/rocksdb-repos/RocksDbRepository.cpp
index 7fda56f8e..6de9fe1a7 100644
--- a/extensions/rocksdb-repos/RocksDbRepository.cpp
+++ b/extensions/rocksdb-repos/RocksDbRepository.cpp
@@ -21,22 +21,13 @@ using namespace std::literals::chrono_literals;
namespace org::apache::nifi::minifi::core::repository {
-void RocksDbRepository::printStats() {
+std::optional<RepositoryMetricsSource::RocksDbStats>
RocksDbRepository::getRocksDbStats() const {
auto opendb = db_->open();
if (!opendb) {
- return;
+ return RocksDbStats{};
}
- std::string key_count;
- opendb->GetProperty("rocksdb.estimate-num-keys", &key_count);
- std::string table_readers;
- opendb->GetProperty("rocksdb.estimate-table-readers-mem", &table_readers);
-
- std::string all_memtables;
- opendb->GetProperty("rocksdb.cur-size-all-mem-tables", &all_memtables);
-
- logger_->log_info("Repository stats: key count: %s, table readers size: %s,
all memory tables size: %s",
- key_count, table_readers, all_memtables);
+ return opendb->getStats();
}
bool RocksDbRepository::ExecuteWithRetry(const
std::function<rocksdb::Status()>& operation) {
diff --git a/extensions/rocksdb-repos/RocksDbRepository.h
b/extensions/rocksdb-repos/RocksDbRepository.h
index 6fd220d8d..88ab000ce 100644
--- a/extensions/rocksdb-repos/RocksDbRepository.h
+++ b/extensions/rocksdb-repos/RocksDbRepository.h
@@ -46,7 +46,7 @@ class RocksDbRepository : public ThreadedRepository {
uint64_t getRepositorySize() const override;
uint64_t getRepositoryEntryCount() const override;
- void printStats();
+ std::optional<RocksDbStats> getRocksDbStats() const override;
protected:
bool ExecuteWithRetry(const std::function<rocksdb::Status()>& operation);
diff --git a/extensions/rocksdb-repos/database/OpenRocksDb.cpp
b/extensions/rocksdb-repos/database/OpenRocksDb.cpp
index 703c66a39..c48c4b62c 100644
--- a/extensions/rocksdb-repos/database/OpenRocksDb.cpp
+++ b/extensions/rocksdb-repos/database/OpenRocksDb.cpp
@@ -135,4 +135,25 @@ std::optional<uint64_t> OpenRocksDb::getApproximateSizes()
const {
return std::nullopt;
}
+minifi::core::RepositoryMetricsSource::RocksDbStats OpenRocksDb::getStats() {
+ minifi::core::RepositoryMetricsSource::RocksDbStats stats;
+ std::string table_readers;
+ GetProperty("rocksdb.estimate-table-readers-mem", &table_readers);
+ try {
+ stats.table_readers_size = std::stoull(table_readers);
+ } catch (const std::exception&) {
+ logger_->log_warn("Could not retrieve valid
'rocksdb.estimate-table-readers-mem' property value from rocksdb content
repository!");
+ }
+
+ std::string all_memtables;
+ GetProperty("rocksdb.cur-size-all-mem-tables", &all_memtables);
+ try {
+ stats.all_memory_tables_size = std::stoull(all_memtables);
+ } catch (const std::exception&) {
+ logger_->log_warn("Could not retrieve valid
'rocksdb.cur-size-all-mem-tables' property value from rocksdb content
repository!");
+ }
+
+ return stats;
+}
+
} // namespace org::apache::nifi::minifi::internal
diff --git a/extensions/rocksdb-repos/database/OpenRocksDb.h
b/extensions/rocksdb-repos/database/OpenRocksDb.h
index b4d964381..d65aad689 100644
--- a/extensions/rocksdb-repos/database/OpenRocksDb.h
+++ b/extensions/rocksdb-repos/database/OpenRocksDb.h
@@ -27,6 +27,8 @@
#include "rocksdb/db.h"
#include "rocksdb/utilities/checkpoint.h"
#include "WriteBatch.h"
+#include "core/RepositoryMetricsSource.h"
+#include "core/logging/LoggerConfiguration.h"
namespace org::apache::nifi::minifi::internal {
@@ -73,6 +75,7 @@ class OpenRocksDb {
rocksdb::DB* get();
std::optional<uint64_t> getApproximateSizes() const;
+ minifi::core::RepositoryMetricsSource::RocksDbStats getStats();
private:
void handleResult(const rocksdb::Status& result);
@@ -81,6 +84,7 @@ class OpenRocksDb {
gsl::not_null<RocksDbInstance*> db_;
gsl::not_null<std::shared_ptr<rocksdb::DB>> impl_;
gsl::not_null<std::shared_ptr<ColumnHandle>> column_;
+ std::shared_ptr<minifi::core::logging::Logger>
logger_{minifi::core::logging::LoggerFactory<OpenRocksDb>::getLogger()};
};
} // namespace org::apache::nifi::minifi::internal
diff --git a/libminifi/include/core/RepositoryMetricsSource.h
b/libminifi/include/core/RepositoryMetricsSource.h
index cfaeb4ae6..6c933b7eb 100644
--- a/libminifi/include/core/RepositoryMetricsSource.h
+++ b/libminifi/include/core/RepositoryMetricsSource.h
@@ -19,11 +19,17 @@
#pragma once
#include <string>
+#include <optional>
namespace org::apache::nifi::minifi::core {
class RepositoryMetricsSource {
public:
+ struct RocksDbStats {
+ uint64_t table_readers_size{};
+ uint64_t all_memory_tables_size{};
+ };
+
virtual ~RepositoryMetricsSource() = default;
virtual uint64_t getRepositorySize() const = 0;
virtual uint64_t getRepositoryEntryCount() const = 0;
@@ -40,6 +46,10 @@ class RepositoryMetricsSource {
virtual bool isRunning() const {
return true;
}
+
+ virtual std::optional<RocksDbStats> getRocksDbStats() const {
+ return std::nullopt;
+ }
};
} // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/include/core/state/nodes/AgentInformation.h
b/libminifi/include/core/state/nodes/AgentInformation.h
index 0d18c721e..97b330d49 100644
--- a/libminifi/include/core/state/nodes/AgentInformation.h
+++ b/libminifi/include/core/state/nodes/AgentInformation.h
@@ -61,7 +61,7 @@
#include "core/AgentIdentificationProvider.h"
#include "utils/Export.h"
#include "SupportedOperations.h"
-#include "core/RepositoryMetricsSource.h"
+#include "RepositoryMetricsSourceStore.h"
namespace org::apache::nifi::minifi::state::response {
@@ -411,11 +411,18 @@ class Bundles : public DeviceInformation {
class AgentStatus : public StateMonitorNode {
public:
AgentStatus(std::string name, const utils::Identifier& uuid)
- : StateMonitorNode(std::move(name), uuid) {
+ : StateMonitorNode(std::move(name), uuid),
+ repository_metrics_source_store_(getName()) {
}
explicit AgentStatus(std::string name)
- : StateMonitorNode(std::move(name)) {
+ : StateMonitorNode(std::move(name)),
+ repository_metrics_source_store_(getName()) {
+ }
+
+ explicit AgentStatus(std::string name, std::string parent_metrics_name)
+ : StateMonitorNode(std::move(name)),
+ repository_metrics_source_store_(std::move(parent_metrics_name)) {
}
MINIFIAPI static constexpr const char* Description = "Metric node that
defines current agent status including repository, component and resource usage
information.";
@@ -424,14 +431,12 @@ class AgentStatus : public StateMonitorNode {
return "AgentStatus";
}
- void setRepositories(const std::map<std::string,
std::shared_ptr<core::RepositoryMetricsSource>> &repositories) {
- repositories_ = repositories;
+ void setRepositories(const
std::vector<std::shared_ptr<core::RepositoryMetricsSource>> &repositories) {
+ repository_metrics_source_store_.setRepositories(repositories);
}
void addRepository(const std::shared_ptr<core::RepositoryMetricsSource>
&repo) {
- if (nullptr != repo) {
- repositories_.insert(std::make_pair(repo->getRepositoryName(), repo));
- }
+ repository_metrics_source_store_.addRepository(repo);
}
std::vector<SerializedResponseNode> serialize() override {
@@ -453,14 +458,7 @@ class AgentStatus : public StateMonitorNode {
}
std::vector<PublishedMetric> calculateMetrics() override {
- std::vector<PublishedMetric> metrics;
- for (const auto& [_, repo] : repositories_) {
- metrics.push_back({"is_running", (repo->isRunning() ? 1.0 : 0.0),
{{"metric_class", getName()}, {"repository_name", repo->getRepositoryName()}}});
- metrics.push_back({"is_full", (repo->isFull() ? 1.0 : 0.0),
{{"metric_class", getName()}, {"repository_name", repo->getRepositoryName()}}});
- metrics.push_back({"repository_size_bytes",
static_cast<double>(repo->getRepositorySize()), {{"metric_class", getName()},
{"repository_name", repo->getRepositoryName()}}});
- metrics.push_back({"max_repository_size_bytes",
static_cast<double>(repo->getMaxRepositorySize()), {{"metric_class",
getName()}, {"repository_name", repo->getRepositoryName()}}});
- metrics.push_back({"repository_entry_count",
static_cast<double>(repo->getRepositoryEntryCount()), {{"metric_class",
getName()}, {"repository_name", repo->getRepositoryName()}}});
- }
+ auto metrics = repository_metrics_source_store_.calculateMetrics();
if (nullptr != monitor_) {
auto uptime = monitor_->getUptime();
metrics.push_back({"uptime_milliseconds", static_cast<double>(uptime),
{{"metric_class", getName()}}});
@@ -487,41 +485,8 @@ class AgentStatus : public StateMonitorNode {
protected:
SerializedResponseNode serializeRepositories() const {
SerializedResponseNode repositories;
-
repositories.name = "repositories";
-
- for (const auto& repo : repositories_) {
- SerializedResponseNode repo_node;
- repo_node.collapsible = false;
- repo_node.name = repo.first;
-
- SerializedResponseNode repo_size;
- repo_size.name = "size";
- repo_size.value = repo.second->getRepositorySize();
-
- SerializedResponseNode max_repo_size;
- max_repo_size.name = "maxSize";
- max_repo_size.value = repo.second->getMaxRepositorySize();
-
- SerializedResponseNode repo_entry_count;
- repo_entry_count.name = "entryCount";
- repo_entry_count.value = repo.second->getRepositoryEntryCount();
-
- SerializedResponseNode is_running;
- is_running.name = "running";
- is_running.value = repo.second->isRunning();
-
- SerializedResponseNode is_full;
- is_full.name = "full";
- is_full.value = repo.second->isFull();
-
- repo_node.children.push_back(repo_size);
- repo_node.children.push_back(max_repo_size);
- repo_node.children.push_back(repo_entry_count);
- repo_node.children.push_back(is_running);
- repo_node.children.push_back(is_full);
- repositories.children.push_back(repo_node);
- }
+ repositories.children = repository_metrics_source_store_.serialize();
return repositories;
}
@@ -591,7 +556,7 @@ class AgentStatus : public StateMonitorNode {
return resource_consumption;
}
- std::map<std::string, std::shared_ptr<core::RepositoryMetricsSource>>
repositories_;
+ RepositoryMetricsSourceStore repository_metrics_source_store_;
MINIFIAPI static utils::ProcessCpuUsageTracker cpu_load_tracker_;
MINIFIAPI static std::mutex cpu_load_tracker_mutex_;
@@ -623,7 +588,7 @@ class AgentMonitor {
}
void addRepository(const std::shared_ptr<core::RepositoryMetricsSource>
&repo) {
if (nullptr != repo) {
- repositories_.insert(std::make_pair(repo->getRepositoryName(), repo));
+ repositories_.push_back(repo);
}
}
@@ -632,7 +597,7 @@ class AgentMonitor {
}
protected:
- std::map<std::string, std::shared_ptr<core::RepositoryMetricsSource>>
repositories_;
+ std::vector<std::shared_ptr<core::RepositoryMetricsSource>> repositories_;
state::StateMonitor* monitor_ = nullptr;
};
@@ -762,7 +727,7 @@ class AgentNode : public DeviceInformation, public
AgentMonitor, public AgentIde
std::vector<SerializedResponseNode> getAgentStatus() const {
std::vector<SerializedResponseNode> serialized;
- AgentStatus status("status");
+ AgentStatus status("status", getName());
status.setRepositories(repositories_);
status.setStateMonitor(monitor_);
diff --git a/libminifi/include/core/state/nodes/RepositoryMetrics.h
b/libminifi/include/core/state/nodes/RepositoryMetrics.h
index bc8dada72..788beb006 100644
--- a/libminifi/include/core/state/nodes/RepositoryMetrics.h
+++ b/libminifi/include/core/state/nodes/RepositoryMetrics.h
@@ -26,7 +26,7 @@
#include "../nodes/MetricsBase.h"
#include "Connection.h"
-#include "core/RepositoryMetricsSource.h"
+#include "RepositoryMetricsSourceStore.h"
namespace org::apache::nifi::minifi::state::response {
@@ -38,15 +38,18 @@ namespace org::apache::nifi::minifi::state::response {
class RepositoryMetrics : public ResponseNode {
public:
RepositoryMetrics(std::string name, const utils::Identifier &uuid)
- : ResponseNode(std::move(name), uuid) {
+ : ResponseNode(std::move(name), uuid),
+ repository_metrics_source_store_(getName()) {
}
explicit RepositoryMetrics(std::string name)
- : ResponseNode(std::move(name)) {
+ : ResponseNode(std::move(name)),
+ repository_metrics_source_store_(getName()) {
}
RepositoryMetrics()
- : ResponseNode("RepositoryMetrics") {
+ : ResponseNode("RepositoryMetrics"),
+ repository_metrics_source_store_(getName()) {
}
MINIFIAPI static constexpr const char* Description = "Metric node that
defines repository metric information";
@@ -56,61 +59,19 @@ class RepositoryMetrics : public ResponseNode {
}
void addRepository(const std::shared_ptr<core::RepositoryMetricsSource>
&repo) {
- if (nullptr != repo) {
- repositories_.push_back(repo);
- }
+ return repository_metrics_source_store_.addRepository(repo);
}
std::vector<SerializedResponseNode> serialize() override {
- std::vector<SerializedResponseNode> serialized;
- for (const auto& repo : repositories_) {
- SerializedResponseNode parent;
- parent.name = repo->getRepositoryName();
- SerializedResponseNode is_running;
- is_running.name = "running";
- is_running.value = repo->isRunning();
-
- SerializedResponseNode is_full;
- is_full.name = "full";
- is_full.value = repo->isFull();
-
- SerializedResponseNode repo_size;
- repo_size.name = "size";
- repo_size.value = std::to_string(repo->getRepositorySize());
-
- SerializedResponseNode max_repo_size;
- max_repo_size.name = "maxSize";
- max_repo_size.value = std::to_string(repo->getMaxRepositorySize());
-
- SerializedResponseNode repo_entry_count;
- repo_entry_count.name = "entryCount";
- repo_entry_count.value = repo->getRepositoryEntryCount();
-
- parent.children.push_back(is_running);
- parent.children.push_back(is_full);
- parent.children.push_back(repo_size);
- parent.children.push_back(max_repo_size);
- parent.children.push_back(repo_entry_count);
-
- serialized.push_back(parent);
- }
- return serialized;
+ return repository_metrics_source_store_.serialize();
}
std::vector<PublishedMetric> calculateMetrics() override {
- std::vector<PublishedMetric> metrics;
- for (const auto& repo : repositories_) {
- metrics.push_back({"is_running", (repo->isRunning() ? 1.0 : 0.0),
{{"metric_class", getName()}, {"repository_name", repo->getRepositoryName()}}});
- metrics.push_back({"is_full", (repo->isFull() ? 1.0 : 0.0),
{{"metric_class", getName()}, {"repository_name", repo->getRepositoryName()}}});
- metrics.push_back({"repository_size_bytes",
static_cast<double>(repo->getRepositorySize()), {{"metric_class", getName()},
{"repository_name", repo->getRepositoryName()}}});
- metrics.push_back({"max_repository_size_bytes",
static_cast<double>(repo->getMaxRepositorySize()), {{"metric_class",
getName()}, {"repository_name", repo->getRepositoryName()}}});
- metrics.push_back({"repository_entry_count",
static_cast<double>(repo->getRepositoryEntryCount()), {{"metric_class",
getName()}, {"repository_name", repo->getRepositoryName()}}});
- }
- return metrics;
+ return repository_metrics_source_store_.calculateMetrics();
}
protected:
- std::vector<std::shared_ptr<core::RepositoryMetricsSource>> repositories_;
+ RepositoryMetricsSourceStore repository_metrics_source_store_;
};
} // namespace org::apache::nifi::minifi::state::response
diff --git a/libminifi/include/core/RepositoryMetricsSource.h
b/libminifi/include/core/state/nodes/RepositoryMetricsSourceStore.h
similarity index 51%
copy from libminifi/include/core/RepositoryMetricsSource.h
copy to libminifi/include/core/state/nodes/RepositoryMetricsSourceStore.h
index cfaeb4ae6..b4fcc39f9 100644
--- a/libminifi/include/core/RepositoryMetricsSource.h
+++ b/libminifi/include/core/state/nodes/RepositoryMetricsSourceStore.h
@@ -15,31 +15,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
#pragma once
+#include <memory>
#include <string>
+#include <utility>
+#include <vector>
-namespace org::apache::nifi::minifi::core {
-
-class RepositoryMetricsSource {
- public:
- virtual ~RepositoryMetricsSource() = default;
- virtual uint64_t getRepositorySize() const = 0;
- virtual uint64_t getRepositoryEntryCount() const = 0;
- virtual std::string getRepositoryName() const = 0;
+#include "core/RepositoryMetricsSource.h"
+#include "core/state/Value.h"
+#include "core/state/PublishedMetricProvider.h"
- virtual uint64_t getMaxRepositorySize() const {
- return 0;
- }
+namespace org::apache::nifi::minifi::state::response {
- virtual bool isFull() const {
- return false;
- }
-
- virtual bool isRunning() const {
- return true;
- }
+class RepositoryMetricsSourceStore {
+ public:
+ explicit RepositoryMetricsSourceStore(std::string name);
+ void setRepositories(const
std::vector<std::shared_ptr<core::RepositoryMetricsSource>> &repositories);
+ void addRepository(const std::shared_ptr<core::RepositoryMetricsSource>
&repo);
+ std::vector<SerializedResponseNode> serialize() const;
+ std::vector<PublishedMetric> calculateMetrics() const;
+
+ private:
+ std::string name_;
+ std::vector<std::shared_ptr<core::RepositoryMetricsSource>> repositories_;
};
-} // namespace org::apache::nifi::minifi::core
+} // namespace org::apache::nifi::minifi::state::response
diff --git a/libminifi/src/core/state/nodes/RepositoryMetricsSourceStore.cpp
b/libminifi/src/core/state/nodes/RepositoryMetricsSourceStore.cpp
new file mode 100644
index 000000000..0d4789f8c
--- /dev/null
+++ b/libminifi/src/core/state/nodes/RepositoryMetricsSourceStore.cpp
@@ -0,0 +1,76 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "core/state/nodes/RepositoryMetricsSourceStore.h"
+
+namespace org::apache::nifi::minifi::state::response {
+
+RepositoryMetricsSourceStore::RepositoryMetricsSourceStore(std::string name) :
name_(std::move(name)) {}
+
+void RepositoryMetricsSourceStore::setRepositories(const
std::vector<std::shared_ptr<core::RepositoryMetricsSource>> &repositories) {
+ repositories_ = repositories;
+}
+
+void RepositoryMetricsSourceStore::addRepository(const
std::shared_ptr<core::RepositoryMetricsSource> &repo) {
+ if (nullptr != repo) {
+ repositories_.push_back(repo);
+ }
+}
+
+std::vector<SerializedResponseNode> RepositoryMetricsSourceStore::serialize()
const {
+ std::vector<SerializedResponseNode> serialized;
+ for (const auto& repo : repositories_) {
+ SerializedResponseNode parent = {
+ .name = repo->getRepositoryName(),
+ .children = {
+ {.name = "running", .value = repo->isRunning()},
+ {.name = "full", .value = repo->isFull()},
+ {.name = "size", .value = repo->getRepositorySize()},
+ {.name = "maxSize", .value = repo->getMaxRepositorySize()},
+ {.name = "entryCount", .value = repo->getRepositoryEntryCount()},
+ }
+ };
+
+ if (auto rocksdb_stats = repo->getRocksDbStats()) {
+ parent.children.push_back({.name = "rocksDbTableReadersSize", .value =
rocksdb_stats->table_readers_size});
+ parent.children.push_back({.name = "rocksDbAllMemoryTablesSize", .value
= rocksdb_stats->all_memory_tables_size});
+ }
+
+ serialized.push_back(parent);
+ }
+ return serialized;
+}
+
+std::vector<PublishedMetric> RepositoryMetricsSourceStore::calculateMetrics()
const {
+ std::vector<PublishedMetric> metrics;
+ for (const auto& repo : repositories_) {
+ metrics.push_back({"is_running", (repo->isRunning() ? 1.0 : 0.0),
{{"metric_class", name_}, {"repository_name", repo->getRepositoryName()}}});
+ metrics.push_back({"is_full", (repo->isFull() ? 1.0 : 0.0),
{{"metric_class", name_}, {"repository_name", repo->getRepositoryName()}}});
+ metrics.push_back({"repository_size_bytes",
static_cast<double>(repo->getRepositorySize()), {{"metric_class", name_},
{"repository_name", repo->getRepositoryName()}}});
+ metrics.push_back({"max_repository_size_bytes",
static_cast<double>(repo->getMaxRepositorySize()), {{"metric_class", name_},
{"repository_name", repo->getRepositoryName()}}});
+ metrics.push_back({"repository_entry_count",
static_cast<double>(repo->getRepositoryEntryCount()), {{"metric_class", name_},
{"repository_name", repo->getRepositoryName()}}});
+ if (auto rocksdb_stats = repo->getRocksDbStats()) {
+ metrics.push_back({"rocksdb_table_readers_size_bytes",
static_cast<double>(rocksdb_stats->table_readers_size),
+ {{"metric_class", name_}, {"repository_name",
repo->getRepositoryName()}}});
+ metrics.push_back({"rocksdb_all_memory_tables_size_bytes",
static_cast<double>(rocksdb_stats->all_memory_tables_size),
+ {{"metric_class", name_}, {"repository_name",
repo->getRepositoryName()}}});
+ }
+ }
+ return metrics;
+}
+
+} // namespace org::apache::nifi::minifi::state::response
diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp
b/libminifi/test/rocksdb-tests/RepoTests.cpp
index 4a187cca6..6a5558831 100644
--- a/libminifi/test/rocksdb-tests/RepoTests.cpp
+++ b/libminifi/test/rocksdb-tests/RepoTests.cpp
@@ -21,6 +21,7 @@
#include <memory>
#include <string>
#include <thread>
+#include <optional>
#include "core/Core.h"
#include "core/repository/AtomicRepoEntries.h"
@@ -530,12 +531,15 @@ TEST_CASE("Test getting flow file repository size
properties", "[TestGettingRepo
std::shared_ptr<core::Repository> repository;
auto expected_is_full = false;
uint64_t expected_max_repo_size = 0;
+ bool expected_rocksdb_stats = false;
SECTION("FlowFileRepository") {
repository = std::make_shared<core::repository::FlowFileRepository>("ff",
dir.string(), 0ms, 0, 1ms);
+ expected_rocksdb_stats = true;
}
SECTION("ProvenanceRepository") {
repository =
std::make_shared<minifi::provenance::ProvenanceRepository>("ff", dir.string(),
0ms, 0, 1ms);
+ expected_rocksdb_stats = true;
}
SECTION("VolatileFlowFileRepository") {
@@ -584,6 +588,11 @@ TEST_CASE("Test getting flow file repository size
properties", "[TestGettingRepo
REQUIRE(expected_is_full == repository->isFull());
REQUIRE(expected_max_repo_size == repository->getMaxRepositorySize());
REQUIRE(2 == repository->getRepositoryEntryCount());
+ auto rocksdb_stats = repository->getRocksDbStats();
+ REQUIRE(expected_rocksdb_stats == (rocksdb_stats != std::nullopt));
+ if (rocksdb_stats) {
+ REQUIRE(rocksdb_stats->all_memory_tables_size > 0);
+ }
}
TEST_CASE("Test getting noop repository size properties",
"[TestGettingRepositorySize]") {
@@ -626,6 +635,7 @@ TEST_CASE("Test getting content repository size
properties", "[TestGettingReposi
std::shared_ptr<core::ContentRepository> content_repo;
auto expected_is_full = false;
uint64_t expected_max_repo_size = 0;
+ bool expected_rocksdb_stats = false;
SECTION("FileSystemRepository") {
content_repo = std::make_shared<core::repository::FileSystemRepository>();
}
@@ -638,6 +648,7 @@ TEST_CASE("Test getting content repository size
properties", "[TestGettingReposi
SECTION("DatabaseContentRepository") {
content_repo =
std::make_shared<core::repository::DatabaseContentRepository>();
+ expected_rocksdb_stats = true;
}
content_repo->initialize(configuration);
@@ -672,6 +683,11 @@ TEST_CASE("Test getting content repository size
properties", "[TestGettingReposi
REQUIRE(expected_is_full == content_repo->isFull());
REQUIRE(expected_max_repo_size == content_repo->getMaxRepositorySize());
REQUIRE(1 == content_repo->getRepositoryEntryCount());
+ auto rocksdb_stats = content_repo->getRocksDbStats();
+ REQUIRE(expected_rocksdb_stats == (rocksdb_stats != std::nullopt));
+ if (rocksdb_stats) {
+ REQUIRE(rocksdb_stats->all_memory_tables_size > 0);
+ }
}
TEST_CASE("Flow file repositories can be stopped", "[TestRepoIsRunning]") {
diff --git a/libminifi/test/unit/LogMetricsPublisherTests.cpp
b/libminifi/test/unit/LogMetricsPublisherTests.cpp
index 35d4ba4f2..c7fafb867 100644
--- a/libminifi/test/unit/LogMetricsPublisherTests.cpp
+++ b/libminifi/test/unit/LogMetricsPublisherTests.cpp
@@ -38,6 +38,8 @@ class LogPublisherTestFixture {
response_node_loader_(std::make_shared<state::response::ResponseNodeLoader>(configuration_,
std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{provenance_repo_,
flow_file_repo_}, nullptr)),
publisher_("LogMetricsPublisher") {
+ provenance_repo_->initialize(configuration_);
+ flow_file_repo_->initialize(configuration_);
}
protected:
@@ -89,14 +91,18 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify multiple
metric nodes in logs"
"full": "false",
"size": "0",
"maxSize": "0",
- "entryCount": "0"
+ "entryCount": "0",
+ "rocksDbTableReadersSize": "0",
+ "rocksDbAllMemoryTablesSize": "2048"
},
"flowfilerepository": {
"running": "false",
"full": "false",
"size": "0",
"maxSize": "0",
- "entryCount": "0"
+ "entryCount": "0",
+ "rocksDbTableReadersSize": "0",
+ "rocksDbAllMemoryTablesSize": "2048"
}
},
"deviceInfo": {
@@ -119,14 +125,18 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify
reloading different metrics",
"full": "false",
"size": "0",
"maxSize": "0",
- "entryCount": "0"
+ "entryCount": "0",
+ "rocksDbTableReadersSize": "0",
+ "rocksDbAllMemoryTablesSize": "2048"
},
"flowfilerepository": {
"running": "false",
"full": "false",
"size": "0",
"maxSize": "0",
- "entryCount": "0"
+ "entryCount": "0",
+ "rocksDbTableReadersSize": "0",
+ "rocksDbAllMemoryTablesSize": "2048"
}
}
}
@@ -168,14 +178,18 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify generic
and publisher specific
"full": "false",
"size": "0",
"maxSize": "0",
- "entryCount": "0"
+ "entryCount": "0",
+ "rocksDbTableReadersSize": "0",
+ "rocksDbAllMemoryTablesSize": "2048"
},
"flowfilerepository": {
"running": "false",
"full": "false",
"size": "0",
"maxSize": "0",
- "entryCount": "0"
+ "entryCount": "0",
+ "rocksDbTableReadersSize": "0",
+ "rocksDbAllMemoryTablesSize": "2048"
}
}
}
@@ -199,14 +213,18 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify
changing log level property fo
"full": "false",
"size": "0",
"maxSize": "0",
- "entryCount": "0"
+ "entryCount": "0",
+ "rocksDbTableReadersSize": "0",
+ "rocksDbAllMemoryTablesSize": "2048"
},
"flowfilerepository": {
"running": "false",
"full": "false",
"size": "0",
"maxSize": "0",
- "entryCount": "0"
+ "entryCount": "0",
+ "rocksDbTableReadersSize": "0",
+ "rocksDbAllMemoryTablesSize": "2048"
}
}
}
diff --git a/libminifi/test/unit/MetricsTests.cpp
b/libminifi/test/unit/MetricsTests.cpp
index af6d9e963..5be43fb4f 100644
--- a/libminifi/test/unit/MetricsTests.cpp
+++ b/libminifi/test/unit/MetricsTests.cpp
@@ -90,7 +90,19 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") {
REQUIRE("RepositoryMetrics" == metrics.getName());
- auto repo = std::make_shared<TestThreadedRepository>();
+ std::shared_ptr<TestThreadedRepository> repo;
+ size_t expected_metric_count{};
+
+ SECTION("Non-RocksDB repository") {
+ repo = std::make_shared<TestThreadedRepository>();
+ expected_metric_count = 5;
+ }
+
+ SECTION("RocksDB repository") {
+ repo = std::make_shared<TestRocksDbRepository>();
+ expected_metric_count = 7;
+ }
+
metrics.addRepository(repo);
{
@@ -99,13 +111,17 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") {
minifi::state::response::SerializedResponseNode resp =
metrics.serialize().at(0);
REQUIRE("repo_name" == resp.name);
- REQUIRE(5 == resp.children.size());
+ REQUIRE(expected_metric_count == resp.children.size());
checkSerializedValue(resp.children, "running", "false");
checkSerializedValue(resp.children, "full", "false");
checkSerializedValue(resp.children, "size", "0");
checkSerializedValue(resp.children, "maxSize", "0");
checkSerializedValue(resp.children, "entryCount", "0");
+ if (expected_metric_count > 5) {
+ checkSerializedValue(resp.children, "rocksDbTableReadersSize", "100");
+ checkSerializedValue(resp.children, "rocksDbAllMemoryTablesSize", "200");
+ }
}
repo->start();
@@ -115,13 +131,17 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") {
minifi::state::response::SerializedResponseNode resp =
metrics.serialize().at(0);
REQUIRE("repo_name" == resp.name);
- REQUIRE(5 == resp.children.size());
+ REQUIRE(expected_metric_count == resp.children.size());
checkSerializedValue(resp.children, "running", "true");
checkSerializedValue(resp.children, "full", "false");
checkSerializedValue(resp.children, "size", "0");
checkSerializedValue(resp.children, "maxSize", "0");
checkSerializedValue(resp.children, "entryCount", "0");
+ if (expected_metric_count > 5) {
+ checkSerializedValue(resp.children, "rocksDbTableReadersSize", "100");
+ checkSerializedValue(resp.children, "rocksDbAllMemoryTablesSize", "200");
+ }
}
repo->stop();
@@ -132,13 +152,17 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") {
minifi::state::response::SerializedResponseNode resp =
metrics.serialize().at(0);
REQUIRE("repo_name" == resp.name);
- REQUIRE(5 == resp.children.size());
+ REQUIRE(expected_metric_count == resp.children.size());
checkSerializedValue(resp.children, "running", "false");
checkSerializedValue(resp.children, "full", "false");
checkSerializedValue(resp.children, "size", "0");
checkSerializedValue(resp.children, "maxSize", "0");
checkSerializedValue(resp.children, "entryCount", "0");
+ if (expected_metric_count > 5) {
+ checkSerializedValue(resp.children, "rocksDbTableReadersSize", "100");
+ checkSerializedValue(resp.children, "rocksDbAllMemoryTablesSize", "200");
+ }
}
}
diff --git a/libminifi/test/unit/ProvenanceTestHelper.h
b/libminifi/test/unit/ProvenanceTestHelper.h
index ec979e172..ffc30bfaf 100644
--- a/libminifi/test/unit/ProvenanceTestHelper.h
+++ b/libminifi/test/unit/ProvenanceTestHelper.h
@@ -175,6 +175,16 @@ class TestThreadedRepository : public
TestRepositoryBase<org::apache::nifi::mini
std::thread thread_;
};
+class TestRocksDbRepository : public TestThreadedRepository {
+ public:
+ std::optional<RocksDbStats> getRocksDbStats() const override {
+ return RocksDbStats {
+ .table_readers_size = 100,
+ .all_memory_tables_size = 200
+ };
+ }
+};
+
class TestFlowRepository : public
org::apache::nifi::minifi::core::ThreadedRepository {
public:
TestFlowRepository()