This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit a469556a45b4f94c5f6f6729baf26ea670387a10
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Mon Jan 20 16:54:26 2025 +0100

    MINIFICPP-2503 Remove C2 metric duplication in heartbeat
    
    Closes #1908
    
    Signed-off-by: Marton Szasz <[email protected]>
---
 .../prometheus/PrometheusMetricsPublisher.cpp      |  5 +++--
 .../include/core/state/nodes/ResponseNodeLoader.h  |  2 ++
 libminifi/src/c2/C2MetricsPublisher.cpp            | 25 +++++++++++++---------
 libminifi/src/core/state/LogMetricsPublisher.cpp   |  5 +++--
 .../src/core/state/MetricsPublisherFactory.cpp     |  7 +++---
 .../src/core/state/nodes/ResponseNodeLoader.cpp    |  9 ++++++++
 libminifi/test/integration/C2MetricsTest.cpp       | 15 ++++++++++---
 libminifi/test/unit/LogMetricsPublisherTests.cpp   | 16 ++++++++------
 8 files changed, 57 insertions(+), 27 deletions(-)

diff --git a/extensions/prometheus/PrometheusMetricsPublisher.cpp 
b/extensions/prometheus/PrometheusMetricsPublisher.cpp
index 36f239166..f4a010f08 100644
--- a/extensions/prometheus/PrometheusMetricsPublisher.cpp
+++ b/extensions/prometheus/PrometheusMetricsPublisher.cpp
@@ -85,8 +85,9 @@ 
std::vector<gsl::not_null<std::shared_ptr<state::PublishedMetricProvider>>> Prom
     metric_classes_str = 
configuration_->get(minifi::Configuration::nifi_metrics_publisher_metrics);
   }
   if (metric_classes_str && !metric_classes_str->empty()) {
-    auto metric_classes = utils::string::split(*metric_classes_str, ",");
-    for (const std::string& clazz : metric_classes) {
+    auto metric_classes = 
utils::string::splitAndTrimRemovingEmpty(*metric_classes_str, ",");
+    std::unordered_set<std::string> 
unique_metric_classes{metric_classes.begin(), metric_classes.end()};
+    for (const std::string& clazz : unique_metric_classes) {
       auto response_nodes = response_node_loader_->loadResponseNodes(clazz);
       if (response_nodes.empty()) {
         logger_->log_warn("Metric class '{}' could not be loaded.", clazz);
diff --git a/libminifi/include/core/state/nodes/ResponseNodeLoader.h 
b/libminifi/include/core/state/nodes/ResponseNodeLoader.h
index 9eb55f413..31e8ac93f 100644
--- a/libminifi/include/core/state/nodes/ResponseNodeLoader.h
+++ b/libminifi/include/core/state/nodes/ResponseNodeLoader.h
@@ -69,9 +69,11 @@ class ResponseNodeLoader {
   mutable std::mutex root_mutex_;
   mutable std::mutex component_metrics_mutex_;
   mutable std::mutex system_metrics_mutex_;
+  mutable std::mutex initialization_mutex_;
   core::ProcessGroup* root_{};
   std::unordered_map<std::string, std::vector<SharedResponseNode>> 
component_metrics_;
   std::unordered_map<std::string, SharedResponseNode> system_metrics_;
+  std::unordered_set<std::string> initialized_metrics_;
   std::shared_ptr<Configure> configuration_;
   std::vector<std::shared_ptr<core::RepositoryMetricsSource>> 
repository_metric_sources_;
   std::shared_ptr<core::FlowConfiguration> flow_configuration_;
diff --git a/libminifi/src/c2/C2MetricsPublisher.cpp 
b/libminifi/src/c2/C2MetricsPublisher.cpp
index a58b72bf4..3e90d36fd 100644
--- a/libminifi/src/c2/C2MetricsPublisher.cpp
+++ b/libminifi/src/c2/C2MetricsPublisher.cpp
@@ -41,8 +41,9 @@ namespace org::apache::nifi::minifi::c2 {
 
 void C2MetricsPublisher::loadNodeClasses(const std::string& class_definitions, 
const state::response::SharedResponseNode& new_node) {
   gsl_Expects(response_node_loader_);
-  auto classes = utils::string::split(class_definitions, ",");
-  for (const std::string& clazz : classes) {
+  auto classes = utils::string::splitAndTrimRemovingEmpty(class_definitions, 
",");
+  std::unordered_set<std::string> unique_classes{classes.begin(), 
classes.end()};
+  for (const std::string& clazz : unique_classes) {
     auto response_nodes = response_node_loader_->loadResponseNodes(clazz);
     if (response_nodes.empty()) {
       continue;
@@ -60,9 +61,10 @@ void C2MetricsPublisher::loadC2ResponseConfiguration(const 
std::string &prefix)
     return;
   }
 
-  std::vector<std::string> classes = utils::string::split(class_definitions, 
",");
+  auto classes = utils::string::splitAndTrimRemovingEmpty(class_definitions, 
",");
+  std::unordered_set<std::string> unique_classes{classes.begin(), 
classes.end()};
 
-  for (const std::string& metricsClass : classes) {
+  for (const std::string& metricsClass : unique_classes) {
     try {
       std::string option = utils::string::join_pack(prefix, ".", metricsClass);
       std::string classOption = option + ".classes";
@@ -97,9 +99,10 @@ state::response::SharedResponseNode 
C2MetricsPublisher::loadC2ResponseConfigurat
   if (!configuration_->get(prefix, class_definitions)) {
     return prev_node;
   }
-  std::vector<std::string> classes = utils::string::split(class_definitions, 
",");
+  auto classes = utils::string::splitAndTrimRemovingEmpty(class_definitions, 
",");
+  std::unordered_set<std::string> unique_classes{classes.begin(), 
classes.end()};
 
-  for (const std::string& metricsClass : classes) {
+  for (const std::string& metricsClass : unique_classes) {
     try {
       std::string option = utils::string::join_pack(prefix, ".", metricsClass);
       std::string classOption = option + ".classes";
@@ -111,8 +114,9 @@ state::response::SharedResponseNode 
C2MetricsPublisher::loadC2ResponseConfigurat
       }
       state::response::SharedResponseNode new_node = 
gsl::make_not_null(std::make_shared<state::response::ObjectNode>(name));
       if (name.find(',') != std::string::npos) {
-        std::vector<std::string> sub_classes = utils::string::split(name, ",");
-        for (const std::string& subClassStr : classes) {
+        auto sub_classes = utils::string::splitAndTrimRemovingEmpty(name, ",");
+        std::unordered_set<std::string> 
unique_sub_classes{sub_classes.begin(), sub_classes.end()};
+        for (const std::string& subClassStr : unique_sub_classes) {
           auto node = loadC2ResponseConfiguration(subClassStr, prev_node);
           
static_cast<state::response::ObjectNode*>(prev_node.get())->add_node(node);
         }
@@ -198,9 +202,10 @@ void C2MetricsPublisher::loadMetricNodes() {
   std::string class_csv;
   std::lock_guard<std::mutex> guard{metrics_mutex_};
   if (configuration_->get(minifi::Configuration::nifi_c2_root_classes, 
class_csv)) {
-    std::vector<std::string> classes = utils::string::split(class_csv, ",");
+    auto classes = utils::string::splitAndTrimRemovingEmpty(class_csv, ",");
+    std::unordered_set<std::string> unique_classes{classes.begin(), 
classes.end()};
 
-    for (const std::string& clazz : classes) {
+    for (const std::string& clazz : unique_classes) {
       auto response_nodes = response_node_loader_->loadResponseNodes(clazz);
       if (response_nodes.empty()) {
         continue;
diff --git a/libminifi/src/core/state/LogMetricsPublisher.cpp 
b/libminifi/src/core/state/LogMetricsPublisher.cpp
index e31428a5a..bb91a67a5 100644
--- a/libminifi/src/core/state/LogMetricsPublisher.cpp
+++ b/libminifi/src/core/state/LogMetricsPublisher.cpp
@@ -99,9 +99,10 @@ void LogMetricsPublisher::loadMetricNodes() {
     metric_classes_str = 
configuration_->get(minifi::Configuration::nifi_metrics_publisher_metrics);
   }
   if (metric_classes_str && !metric_classes_str->empty()) {
-    auto metric_classes = utils::string::split(*metric_classes_str, ",");
+    auto metric_classes = 
utils::string::splitAndTrimRemovingEmpty(*metric_classes_str, ",");
+    std::unordered_set<std::string> 
unique_metric_classes{metric_classes.begin(), metric_classes.end()};
     std::lock_guard<std::mutex> lock(response_nodes_mutex_);
-    for (const std::string& clazz : metric_classes) {
+    for (const std::string& clazz : unique_metric_classes) {
       auto loaded_response_nodes = 
response_node_loader_->loadResponseNodes(clazz);
       if (loaded_response_nodes.empty()) {
         logger_->log_warn("Metric class '{}' could not be loaded.", clazz);
diff --git a/libminifi/src/core/state/MetricsPublisherFactory.cpp 
b/libminifi/src/core/state/MetricsPublisherFactory.cpp
index aba0ddf11..d44d706dc 100644
--- a/libminifi/src/core/state/MetricsPublisherFactory.cpp
+++ b/libminifi/src/core/state/MetricsPublisherFactory.cpp
@@ -41,9 +41,10 @@ 
std::vector<gsl::not_null<std::unique_ptr<MetricsPublisher>>> createMetricsPubli
     const std::shared_ptr<Configure>& configuration, const 
std::shared_ptr<state::response::ResponseNodeLoader>& response_node_loader) {
   if (auto metrics_publisher_class_str = 
configuration->get(minifi::Configure::nifi_metrics_publisher_class)) {
     std::vector<gsl::not_null<std::unique_ptr<MetricsPublisher>>> publishers;
-    auto publisher_classes = 
minifi::utils::string::split(*metrics_publisher_class_str, ",");
-    publishers.reserve(publisher_classes.size());
-    for (const auto& publisher_class : publisher_classes) {
+    auto publisher_classes = 
minifi::utils::string::splitAndTrimRemovingEmpty(*metrics_publisher_class_str, 
",");
+    std::unordered_set<std::string> 
unique_publisher_classes{publisher_classes.begin(), publisher_classes.end()};
+    publishers.reserve(unique_publisher_classes.size());
+    for (const auto& publisher_class : unique_publisher_classes) {
       publishers.push_back(createMetricsPublisher(publisher_class, 
configuration, response_node_loader));
     }
     return publishers;
diff --git a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp 
b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
index 68b74da48..2d892a60b 100644
--- a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
+++ b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
@@ -42,6 +42,10 @@ 
ResponseNodeLoader::ResponseNodeLoader(std::shared_ptr<Configure> configuration,
 }
 
 void ResponseNodeLoader::clearConfigRoot() {
+  {
+    std::lock_guard<std::mutex> guard(initialization_mutex_);
+    initialized_metrics_.clear();
+  }
   {
     std::lock_guard<std::mutex> guard(system_metrics_mutex_);
     system_metrics_.clear();
@@ -237,7 +241,11 @@ std::vector<SharedResponseNode> 
ResponseNodeLoader::loadResponseNodes(const std:
     return {};
   }
 
+  std::lock_guard<std::mutex> guard(initialization_mutex_);
   for (const auto& response_node : response_nodes) {
+    if (initialized_metrics_.contains(response_node->getName())) {
+      continue;
+    }
     initializeRepositoryMetrics(response_node);
     initializeQueueMetrics(response_node);
     initializeAgentIdentifier(response_node);
@@ -247,6 +255,7 @@ std::vector<SharedResponseNode> 
ResponseNodeLoader::loadResponseNodes(const std:
     initializeConfigurationChecksums(response_node);
     initializeFlowMonitor(response_node);
     initializeAssetInformation(response_node);
+    initialized_metrics_.insert(response_node->getName());
   }
   return response_nodes;
 }
diff --git a/libminifi/test/integration/C2MetricsTest.cpp 
b/libminifi/test/integration/C2MetricsTest.cpp
index 39a0a19c7..6065b5ecf 100644
--- a/libminifi/test/integration/C2MetricsTest.cpp
+++ b/libminifi/test/integration/C2MetricsTest.cpp
@@ -128,6 +128,8 @@ class MetricsHandler: public HeartbeatHandler {
 
   static bool verifyRuntimeMetrics(const rapidjson::Value& runtime_metrics) {
     return runtime_metrics.HasMember("deviceInfo") &&
+      runtime_metrics["deviceInfo"]["systemInfo"].HasMember("operatingSystem") 
&&
+      runtime_metrics["deviceInfo"]["networkInfo"].HasMember("hostname") &&
       runtime_metrics.HasMember("flowInfo") &&
       runtime_metrics["flowInfo"].HasMember("versionedFlowSnapshotURI") &&
       runtime_metrics["flowInfo"].HasMember("queues") &&
@@ -135,11 +137,15 @@ class MetricsHandler: public HeartbeatHandler {
       
runtime_metrics["flowInfo"]["queues"].HasMember("2438e3c8-015a-1000-79ca-83af40ec1997")
 &&
       runtime_metrics["flowInfo"]["components"].HasMember("FlowController") &&
       runtime_metrics["flowInfo"]["components"].HasMember("GetTCP") &&
-      runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute");
+      runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute") &&
+      runtime_metrics.HasMember("agentInfo") &&
+      
runtime_metrics["agentInfo"]["status"]["repositories"]["ff"].HasMember("size");
   }
 
   static bool verifyUpdatedRuntimeMetrics(const rapidjson::Value& 
runtime_metrics) {
     return runtime_metrics.HasMember("deviceInfo") &&
+      runtime_metrics["deviceInfo"]["systemInfo"].HasMember("operatingSystem") 
&&
+      runtime_metrics["deviceInfo"]["networkInfo"].HasMember("hostname") &&
       runtime_metrics.HasMember("flowInfo") &&
       runtime_metrics["flowInfo"].HasMember("versionedFlowSnapshotURI") &&
       runtime_metrics["flowInfo"].HasMember("queues") &&
@@ -147,7 +153,9 @@ class MetricsHandler: public HeartbeatHandler {
       
runtime_metrics["flowInfo"]["queues"].HasMember("8368e3c8-015a-1003-52ca-83af40ec1332")
 &&
       runtime_metrics["flowInfo"]["components"].HasMember("FlowController") &&
       runtime_metrics["flowInfo"]["components"].HasMember("GenerateFlowFile") 
&&
-      runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute");
+      runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute") &&
+      runtime_metrics.HasMember("agentInfo") &&
+      
runtime_metrics["agentInfo"]["status"]["repositories"]["ff"].HasMember("size");
   }
 
   static bool verifyLoadMetrics(const rapidjson::Value& load_metrics) {
@@ -186,11 +194,12 @@ class MetricsHandler: public HeartbeatHandler {
 TEST_CASE("C2MetricsTest", "[c2test]") {
   std::atomic_bool metrics_updated_successfully{false};
   VerifyC2Metrics harness(metrics_updated_successfully);
+  harness.getConfiguration()->set("nifi.c2.root.classes", 
"FlowInformation,AgentInformation");
   harness.getConfiguration()->set("nifi.c2.root.class.definitions", "metrics");
   
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.name", 
"metrics");
   
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics",
 "runtimemetrics,loadmetrics,processorMetrics");
   
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.runtimemetrics.name",
 "RuntimeMetrics");
-  
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.runtimemetrics.classes",
 "DeviceInfoNode,FlowInformation");
+  
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.runtimemetrics.classes",
 
"DeviceInfoNode,FlowInformation,AssetInformation,DeviceInfoNode,AgentInformation");
   
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.loadmetrics.name",
 "LoadMetrics");
   
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.loadmetrics.classes",
 "QueueMetrics,RepositoryMetrics");
   
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.name",
 "ProcessorMetrics");
diff --git a/libminifi/test/unit/LogMetricsPublisherTests.cpp 
b/libminifi/test/unit/LogMetricsPublisherTests.cpp
index 511113d37..1801bfadf 100644
--- a/libminifi/test/unit/LogMetricsPublisherTests.cpp
+++ b/libminifi/test/unit/LogMetricsPublisherTests.cpp
@@ -96,9 +96,9 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify multiple 
metric nodes in logs"
   publisher_.initialize(configuration_, response_node_loader_);
   publisher_.loadMetricNodes();
   using 
org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime;
-  std::string expected_log = R"([info] {
-    "LogMetrics": {
-        "RepositoryMetrics": {
+  std::string expected_log_1 = R"([info] {
+    "LogMetrics": {)";
+  std::string expected_log_2 = R"("RepositoryMetrics": {
             "provenancerepository": {
                 "running": "false",
                 "full": "false",
@@ -117,10 +117,12 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify 
multiple metric nodes in logs"
                 "rocksDbTableReadersSize": "0",
                 "rocksDbAllMemoryTablesSize": "2048"
             }
-        },
-        "deviceInfo": {
+        })";
+  std::string expected_log_3 = R"("deviceInfo": {
             "identifier":)";
-  REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log));
+  REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log_1));
+  REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log_2));
+  REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log_3));
 }
 
 TEST_CASE_METHOD(LogPublisherTestFixture, "Verify reloading different 
metrics", "[LogMetricsPublisher]") {
@@ -218,7 +220,7 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify changing 
log level property fo
   publisher_.initialize(configuration_, response_node_loader_);
   publisher_.loadMetricNodes();
   using 
org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime;
-  std::string expected_log = R"([debug] {
+  std::string expected_log = R"([info] {
     "LogMetrics": {
         "RepositoryMetrics": {
             "provenancerepository": {

Reply via email to