This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 2173e13e17585a8f3f60edd59c28475bf7669f46
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Tue Nov 22 17:37:37 2022 +0100

    MINIFICPP-1987 Configuring processor metrics with regular expressions
    
    Signed-off-by: Ferenc Gerlits <[email protected]>
    This closes #1459
---
 C2.md                                              |   4 +
 METRICS.md                                         |  16 ++-
 docker/test/integration/minifi/core/ImageStore.py  |   2 +-
 .../http-curl/tests/C2DescribeMetricsTest.cpp      |  30 ++++-
 extensions/http-curl/tests/C2MetricsTest.cpp       |   2 +-
 .../include/core/state/nodes/ResponseNodeLoader.h  |  21 ++--
 libminifi/src/c2/C2Client.cpp                      |   2 +-
 .../src/core/state/nodes/ResponseNodeLoader.cpp    |  40 ++++--
 libminifi/test/unit/ResponseNodeLoaderTests.cpp    | 134 +++++++++++++++++++++
 9 files changed, 219 insertions(+), 32 deletions(-)

diff --git a/C2.md b/C2.md
index 14c17090a..71c40f192 100644
--- a/C2.md
+++ b/C2.md
@@ -117,6 +117,10 @@ a configuration of an agent
     
nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.name=ProcessorMetric
     
nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.classes=GetFileMetrics
 
+Processor metrics can also be configured using regular expressions with the 
`processorMetrics/` prefix, so the following definition is also valid:
+
+    
nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.classes=processorMetrics/Get.*Metrics
+
 This example shows a metrics sub tree defined by the option 
'nifi.c2.root.class.definitions'.
 
 This is a comma separated list of all sub trees. In the example, above, only 
one sub tree exists: metrics.
diff --git a/METRICS.md b/METRICS.md
index fa0732c06..bf6ed91e4 100644
--- a/METRICS.md
+++ b/METRICS.md
@@ -59,13 +59,13 @@ The following option defines which metric classes should be 
exposed through the
 
     # in minifi.properties
 
-    
nifi.metrics.publisher.metrics=QueueMetrics,RepositoryMetrics,GetFileMetrics,DeviceInfoNode,FlowInformation
+    
nifi.metrics.publisher.metrics=QueueMetrics,RepositoryMetrics,GetFileMetrics,DeviceInfoNode,FlowInformation,processorMetrics/Tail.*
 
 An agent identifier should also be defined to identify which agent the metric 
is exposed from. If not set, the hostname is used as the identifier.
 
-       # in minifi.properties
+    # in minifi.properties
 
-       nifi.metrics.publisher.agent.identifier=Agent1
+    nifi.metrics.publisher.agent.identifier=Agent1
 
 ## System Metrics
 
@@ -168,6 +168,16 @@ AgentStatus is a system level metric that defines current 
agent status including
 
 Processor level metrics can be accessed for any processor provided by MiNiFi. 
These metrics correspond to the name of the processor appended by the "Metrics" 
suffix (e.g. GetFileMetrics, TailFileMetrics, etc.).
 
+Besides configuring processor metrics directly, they can also be configured 
using regular expressions with the `processorMetrics/` prefix.
+
+All available processor metrics can be requested in the `minifi.properties` by 
using the following configuration:
+
+    nifi.metrics.publisher.metrics=processorMetrics/.*
+
+Regular expressions can also be used for requesting multiple processor metrics 
at once, like GetFileMetrics and GetTCPMetrics with the following configuration:
+
+    nifi.metrics.publisher.metrics=processorMetrics/Get.*Metrics
+
 ### General Metrics
 
 There are general metrics that are available for all processors. Besides these 
metrics processors can implement additional metrics that are speicific to that 
processor.
diff --git a/docker/test/integration/minifi/core/ImageStore.py 
b/docker/test/integration/minifi/core/ImageStore.py
index c0c158852..20d212c69 100644
--- a/docker/test/integration/minifi/core/ImageStore.py
+++ b/docker/test/integration/minifi/core/ImageStore.py
@@ -111,7 +111,7 @@ class ImageStore:
                 RUN echo nifi.metrics.publisher.agent.identifier=Agent1 >> 
{minifi_root}/conf/minifi.properties
                 RUN echo 
nifi.metrics.publisher.class=PrometheusMetricsPublisher >> 
{minifi_root}/conf/minifi.properties
                 RUN echo 
nifi.metrics.publisher.PrometheusMetricsPublisher.port=9936 >> 
{minifi_root}/conf/minifi.properties
-                RUN echo 
nifi.metrics.publisher.metrics=RepositoryMetrics,QueueMetrics,GetFileMetrics,GetTCPMetrics,PutFileMetrics,FlowInformation,DeviceInfoNode,AgentStatus
 >> {minifi_root}/conf/minifi.properties
+                RUN echo 
nifi.metrics.publisher.metrics=RepositoryMetrics,QueueMetrics,PutFileMetrics,processorMetrics/Get.*,FlowInformation,DeviceInfoNode,AgentStatus
 >> {minifi_root}/conf/minifi.properties
                 RUN echo nifi.c2.enable=true  >> 
{minifi_root}/conf/minifi.properties
                 RUN echo 
nifi.c2.rest.url=http://minifi-c2-server:10090/c2/config/heartbeat  >> 
{minifi_root}/conf/minifi.properties
                 RUN echo 
nifi.c2.rest.url.ack=http://minifi-c2-server:10090/c2/config/acknowledge  >> 
{minifi_root}/conf/minifi.properties
diff --git a/extensions/http-curl/tests/C2DescribeMetricsTest.cpp 
b/extensions/http-curl/tests/C2DescribeMetricsTest.cpp
index acf5d80cb..0e177d5fd 100644
--- a/extensions/http-curl/tests/C2DescribeMetricsTest.cpp
+++ b/extensions/http-curl/tests/C2DescribeMetricsTest.cpp
@@ -66,10 +66,14 @@ class MetricsHandler: public HeartbeatHandler {
 
   void handleHeartbeat(const rapidjson::Document&, struct mg_connection* conn) 
override {
     switch (state_) {
-      case TestState::DESCRIBE_SPECIFIC_METRIC: {
+      case TestState::DESCRIBE_SPECIFIC_PROCESSOR_METRIC: {
         sendHeartbeatResponse("DESCRIBE", "metrics", "889347", conn, 
{{"metricsClass", "GetFileMetrics"}});
         break;
       }
+      case TestState::DESCRIBE_SPECIFIC_SYSTEM_METRIC: {
+        sendHeartbeatResponse("DESCRIBE", "metrics", "889347", conn, 
{{"metricsClass", "QueueMetrics"}});
+        break;
+      }
       case TestState::DESCRIBE_ALL_METRICS: {
         sendHeartbeatResponse("DESCRIBE", "metrics", "889347", conn);
         break;
@@ -81,8 +85,12 @@ class MetricsHandler: public HeartbeatHandler {
 
   void handleAcknowledge(const rapidjson::Document& root) override {
     switch (state_) {
-      case TestState::DESCRIBE_SPECIFIC_METRIC: {
-        verifySpecificMetrics(root);
+      case TestState::DESCRIBE_SPECIFIC_PROCESSOR_METRIC: {
+        verifySpecificProcessorMetrics(root);
+        break;
+      }
+      case TestState::DESCRIBE_SPECIFIC_SYSTEM_METRIC: {
+        verifySpecificSystemMetrics(root);
         break;
       }
       case TestState::DESCRIBE_ALL_METRICS: {
@@ -96,7 +104,8 @@ class MetricsHandler: public HeartbeatHandler {
 
  private:
   enum class TestState {
-    DESCRIBE_SPECIFIC_METRIC,
+    DESCRIBE_SPECIFIC_PROCESSOR_METRIC,
+    DESCRIBE_SPECIFIC_SYSTEM_METRIC,
     DESCRIBE_ALL_METRICS
   };
 
@@ -109,12 +118,21 @@ class MetricsHandler: public HeartbeatHandler {
     mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: 
text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
   }
 
-  void verifySpecificMetrics(const rapidjson::Document& root) {
+  void verifySpecificProcessorMetrics(const rapidjson::Document& root) {
     auto getfile_metrics_verified =
       !root.HasMember("metrics") &&
       root.HasMember("GetFileMetrics") &&
       root["GetFileMetrics"].HasMember(GETFILE1_UUID) &&
       root["GetFileMetrics"].HasMember(GETFILE2_UUID);
+    if (getfile_metrics_verified) {
+      state_ = TestState::DESCRIBE_SPECIFIC_SYSTEM_METRIC;
+    }
+  }
+
+  void verifySpecificSystemMetrics(const rapidjson::Document& root) {
+    auto getfile_metrics_verified =
+      !root.HasMember("metrics") &&
+      root.HasMember("QueueMetrics");
     if (getfile_metrics_verified) {
       state_ = TestState::DESCRIBE_ALL_METRICS;
     }
@@ -136,7 +154,7 @@ class MetricsHandler: public HeartbeatHandler {
     }
   }
 
-  TestState state_ = TestState::DESCRIBE_SPECIFIC_METRIC;
+  TestState state_ = TestState::DESCRIBE_SPECIFIC_PROCESSOR_METRIC;
   std::atomic_bool& metrics_found_;
 };
 
diff --git a/extensions/http-curl/tests/C2MetricsTest.cpp 
b/extensions/http-curl/tests/C2MetricsTest.cpp
index 16dd9fcbc..2d717e2e7 100644
--- a/extensions/http-curl/tests/C2MetricsTest.cpp
+++ b/extensions/http-curl/tests/C2MetricsTest.cpp
@@ -207,7 +207,7 @@ int main(int argc, char **argv) {
   
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.loadmetrics.name",
 "LoadMetrics");
   
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.loadmetrics.classes",
 "QueueMetrics,RepositoryMetrics");
   
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.name",
 "ProcessorMetrics");
-  
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.classes",
 "GetTCPMetrics");
+  
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.classes",
 "processorMetrics/GetTCP.*");
   harness.setKeyDir(args.key_dir);
   auto replacement_path = args.test_file;
   minifi::utils::StringUtils::replaceAll(replacement_path, "TestC2Metrics", 
"TestC2MetricsUpdate");
diff --git a/libminifi/include/core/state/nodes/ResponseNodeLoader.h 
b/libminifi/include/core/state/nodes/ResponseNodeLoader.h
index 8ad250f9f..b30125673 100644
--- a/libminifi/include/core/state/nodes/ResponseNodeLoader.h
+++ b/libminifi/include/core/state/nodes/ResponseNodeLoader.h
@@ -39,22 +39,23 @@ class ResponseNodeLoader {
  public:
   ResponseNodeLoader(std::shared_ptr<Configure> configuration, 
std::shared_ptr<core::Repository> provenance_repo,
     std::shared_ptr<core::Repository> flow_file_repo, core::FlowConfiguration* 
flow_configuration);
-  std::vector<std::shared_ptr<ResponseNode>> loadResponseNodes(const 
std::string& clazz, core::ProcessGroup* root);
-  std::vector<std::shared_ptr<ResponseNode>> getComponentMetricsNodes(const 
std::string& metrics_class) const;
+  void initializeComponentMetrics(core::ProcessGroup* root);
   void 
setControllerServiceProvider(core::controller::ControllerServiceProvider* 
controller);
   void setStateMonitor(state::StateMonitor* update_sink);
-  void initializeComponentMetrics(core::ProcessGroup* root);
+  std::vector<std::shared_ptr<ResponseNode>> loadResponseNodes(const 
std::string& clazz, core::ProcessGroup* root) const;
 
  private:
+  std::vector<std::shared_ptr<ResponseNode>> getComponentMetricsNodes(const 
std::string& metrics_class) const;
   std::vector<std::shared_ptr<ResponseNode>> getResponseNodes(const 
std::string& clazz) const;
-  void initializeRepositoryMetrics(const std::shared_ptr<ResponseNode>& 
response_node);
+  void initializeRepositoryMetrics(const std::shared_ptr<ResponseNode>& 
response_node) const;
   static void initializeQueueMetrics(const std::shared_ptr<ResponseNode>& 
response_node, core::ProcessGroup* root);
-  void initializeAgentIdentifier(const std::shared_ptr<ResponseNode>& 
response_node);
-  void initializeAgentMonitor(const std::shared_ptr<ResponseNode>& 
response_node);
-  void initializeAgentNode(const std::shared_ptr<ResponseNode>& response_node);
-  void initializeAgentStatus(const std::shared_ptr<ResponseNode>& 
response_node);
-  void initializeConfigurationChecksums(const std::shared_ptr<ResponseNode>& 
response_node);
-  void initializeFlowMonitor(const std::shared_ptr<ResponseNode>& 
response_node, core::ProcessGroup* root);
+  void initializeAgentIdentifier(const std::shared_ptr<ResponseNode>& 
response_node) const;
+  void initializeAgentMonitor(const std::shared_ptr<ResponseNode>& 
response_node) const;
+  void initializeAgentNode(const std::shared_ptr<ResponseNode>& response_node) 
const;
+  void initializeAgentStatus(const std::shared_ptr<ResponseNode>& 
response_node) const;
+  void initializeConfigurationChecksums(const std::shared_ptr<ResponseNode>& 
response_node) const;
+  void initializeFlowMonitor(const std::shared_ptr<ResponseNode>& 
response_node, core::ProcessGroup* root) const;
+  std::vector<std::shared_ptr<ResponseNode>> 
getMatchingComponentMetricsNodes(const std::string& regex_str) const;
 
   mutable std::mutex component_metrics_mutex_;
   std::unordered_map<std::string, std::vector<std::shared_ptr<ResponseNode>>> 
component_metrics_;
diff --git a/libminifi/src/c2/C2Client.cpp b/libminifi/src/c2/C2Client.cpp
index 39ec5131f..26b6c6817 100644
--- a/libminifi/src/c2/C2Client.cpp
+++ b/libminifi/src/c2/C2Client.cpp
@@ -210,7 +210,7 @@ std::optional<state::response::NodeReporter::ReportedNode> 
C2Client::getMetricsN
   };
 
   if (!metrics_class.empty()) {
-    auto metrics_nodes = 
response_node_loader_.getComponentMetricsNodes(metrics_class);
+    auto metrics_nodes = 
response_node_loader_.loadResponseNodes(metrics_class, root_.get());
     if (!metrics_nodes.empty()) {
       return createReportedNode(metrics_nodes);
     }
diff --git a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp 
b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
index 149fe14e1..df2f8720e 100644
--- a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
+++ b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
@@ -27,6 +27,8 @@
 #include "core/state/nodes/ConfigurationChecksums.h"
 #include "c2/C2Agent.h"
 #include "utils/gsl.h"
+#include "utils/RegexUtils.h"
+#include "utils/StringUtils.h"
 
 namespace org::apache::nifi::minifi::state::response {
 
@@ -75,7 +77,7 @@ std::vector<std::shared_ptr<ResponseNode>> 
ResponseNodeLoader::getResponseNodes(
   return {response_node};
 }
 
-void ResponseNodeLoader::initializeRepositoryMetrics(const 
std::shared_ptr<ResponseNode>& response_node) {
+void ResponseNodeLoader::initializeRepositoryMetrics(const 
std::shared_ptr<ResponseNode>& response_node) const {
   auto repository_metrics = 
dynamic_cast<RepositoryMetrics*>(response_node.get());
   if (repository_metrics != nullptr) {
     repository_metrics->addRepository(provenance_repo_);
@@ -98,14 +100,14 @@ void ResponseNodeLoader::initializeQueueMetrics(const 
std::shared_ptr<ResponseNo
   }
 }
 
-void ResponseNodeLoader::initializeAgentIdentifier(const 
std::shared_ptr<ResponseNode>& response_node) {
+void ResponseNodeLoader::initializeAgentIdentifier(const 
std::shared_ptr<ResponseNode>& response_node) const {
   auto identifier = 
dynamic_cast<state::response::AgentIdentifier*>(response_node.get());
   if (identifier != nullptr) {
     identifier->setAgentIdentificationProvider(configuration_);
   }
 }
 
-void ResponseNodeLoader::initializeAgentMonitor(const 
std::shared_ptr<ResponseNode>& response_node) {
+void ResponseNodeLoader::initializeAgentMonitor(const 
std::shared_ptr<ResponseNode>& response_node) const {
   auto monitor = 
dynamic_cast<state::response::AgentMonitor*>(response_node.get());
   if (monitor != nullptr) {
     monitor->addRepository(provenance_repo_);
@@ -114,7 +116,7 @@ void ResponseNodeLoader::initializeAgentMonitor(const 
std::shared_ptr<ResponseNo
   }
 }
 
-void ResponseNodeLoader::initializeAgentNode(const 
std::shared_ptr<ResponseNode>& response_node) {
+void ResponseNodeLoader::initializeAgentNode(const 
std::shared_ptr<ResponseNode>& response_node) const {
   auto agent_node = 
dynamic_cast<state::response::AgentNode*>(response_node.get());
   if (agent_node != nullptr && controller_ != nullptr) {
     
agent_node->setUpdatePolicyController(std::static_pointer_cast<controllers::UpdatePolicyControllerService>(controller_->getControllerService(c2::C2Agent::UPDATE_NAME)).get());
@@ -126,7 +128,7 @@ void ResponseNodeLoader::initializeAgentNode(const 
std::shared_ptr<ResponseNode>
   }
 }
 
-void ResponseNodeLoader::initializeAgentStatus(const 
std::shared_ptr<ResponseNode>& response_node) {
+void ResponseNodeLoader::initializeAgentStatus(const 
std::shared_ptr<ResponseNode>& response_node) const {
   auto agent_status = 
dynamic_cast<state::response::AgentStatus*>(response_node.get());
   if (agent_status != nullptr) {
     agent_status->addRepository(provenance_repo_);
@@ -135,7 +137,7 @@ void ResponseNodeLoader::initializeAgentStatus(const 
std::shared_ptr<ResponseNod
   }
 }
 
-void ResponseNodeLoader::initializeConfigurationChecksums(const 
std::shared_ptr<ResponseNode>& response_node) {
+void ResponseNodeLoader::initializeConfigurationChecksums(const 
std::shared_ptr<ResponseNode>& response_node) const {
   auto configuration_checksums = 
dynamic_cast<state::response::ConfigurationChecksums*>(response_node.get());
   if (configuration_checksums) {
     
configuration_checksums->addChecksumCalculator(configuration_->getChecksumCalculator());
@@ -145,7 +147,7 @@ void 
ResponseNodeLoader::initializeConfigurationChecksums(const std::shared_ptr<
   }
 }
 
-void ResponseNodeLoader::initializeFlowMonitor(const 
std::shared_ptr<ResponseNode>& response_node, core::ProcessGroup* root) {
+void ResponseNodeLoader::initializeFlowMonitor(const 
std::shared_ptr<ResponseNode>& response_node, core::ProcessGroup* root) const {
   auto flowMonitor = 
dynamic_cast<state::response::FlowMonitor*>(response_node.get());
   if (flowMonitor == nullptr) {
     return;
@@ -165,7 +167,7 @@ void ResponseNodeLoader::initializeFlowMonitor(const 
std::shared_ptr<ResponseNod
   }
 }
 
-std::vector<std::shared_ptr<ResponseNode>> 
ResponseNodeLoader::loadResponseNodes(const std::string& clazz, 
core::ProcessGroup* root) {
+std::vector<std::shared_ptr<ResponseNode>> 
ResponseNodeLoader::loadResponseNodes(const std::string& clazz, 
core::ProcessGroup* root) const {
   auto response_nodes = getResponseNodes(clazz);
   if (response_nodes.empty()) {
     logger_->log_error("No metric defined for %s", clazz);
@@ -185,9 +187,27 @@ std::vector<std::shared_ptr<ResponseNode>> 
ResponseNodeLoader::loadResponseNodes
   return response_nodes;
 }
 
+std::vector<std::shared_ptr<ResponseNode>> 
ResponseNodeLoader::getMatchingComponentMetricsNodes(const std::string& 
regex_str) const {
+  std::vector<std::shared_ptr<ResponseNode>> result;
+  for (const auto& [metric_name, metrics] : component_metrics_) {
+    utils::Regex regex(regex_str);
+    if (utils::regexMatch(metric_name, regex)) {
+      result.insert(result.end(), metrics.begin(), metrics.end());
+    }
+  }
+  return result;
+}
+
 std::vector<std::shared_ptr<ResponseNode>> 
ResponseNodeLoader::getComponentMetricsNodes(const std::string& metrics_class) 
const {
-  if (!metrics_class.empty()) {
-    std::lock_guard<std::mutex> lock(component_metrics_mutex_);
+  if (metrics_class.empty()) {
+    return {};
+  }
+  std::lock_guard<std::mutex> lock(component_metrics_mutex_);
+  static const std::string PROCESSOR_FILTER_PREFIX = "processorMetrics/";
+  if (utils::StringUtils::startsWith(metrics_class, PROCESSOR_FILTER_PREFIX)) {
+    auto regex_str = metrics_class.substr(PROCESSOR_FILTER_PREFIX.size());
+    return getMatchingComponentMetricsNodes(regex_str);
+  } else {
     const auto citer = component_metrics_.find(metrics_class);
     if (citer != component_metrics_.end()) {
       return citer->second;
diff --git a/libminifi/test/unit/ResponseNodeLoaderTests.cpp 
b/libminifi/test/unit/ResponseNodeLoaderTests.cpp
new file mode 100644
index 000000000..222346764
--- /dev/null
+++ b/libminifi/test/unit/ResponseNodeLoaderTests.cpp
@@ -0,0 +1,134 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include "../Catch.h"
+#include "core/state/nodes/ResponseNodeLoader.h"
+#include "../ReadFromFlowFileTestProcessor.h"
+#include "../WriteToFlowFileTestProcessor.h"
+#include "core/repository/VolatileContentRepository.h"
+#include "utils/Id.h"
+#include "ProvenanceTestHelper.h"
+
+namespace org::apache::nifi::minifi::test {
+
+class ResponseNodeLoaderTestFixture {
+ public:
+  ResponseNodeLoaderTestFixture()
+    : 
root_(std::make_unique<minifi::core::ProcessGroup>(minifi::core::ProcessGroupType::ROOT_PROCESS_GROUP,
 "root")),
+      configuration_(std::make_shared<minifi::Configure>()),
+      prov_repo_(std::make_shared<TestRepository>()),
+      ff_repository_(std::make_shared<TestRepository>()),
+      
content_repo_(std::make_shared<minifi::core::repository::VolatileContentRepository>()),
+      response_node_loader_(configuration_, prov_repo_, ff_repository_, 
nullptr) {
+    ff_repository_->initialize(configuration_);
+    content_repo_->initialize(configuration_);
+    auto uuid1 = 
addProcessor<minifi::processors::WriteToFlowFileTestProcessor>("WriteToFlowFileTestProcessor1");
+    auto uuid2 = 
addProcessor<minifi::processors::WriteToFlowFileTestProcessor>("WriteToFlowFileTestProcessor1");
+    addConnection("Connection", "success", uuid1, uuid2);
+    auto uuid3 = 
addProcessor<minifi::processors::ReadFromFlowFileTestProcessor>("ReadFromFlowFileTestProcessor");
+    addConnection("Connection", "success", uuid2, uuid3);
+    response_node_loader_.initializeComponentMetrics(root_.get());
+  }
+
+ protected:
+  template<typename T, typename = typename 
std::enable_if_t<std::is_base_of_v<minifi::core::Processor, T>>>
+  minifi::utils::Identifier addProcessor(const std::string& name) {
+    auto processor = std::make_unique<T>(name);
+    auto uuid = processor->getUUID();
+    root_->addProcessor(std::move(processor));
+    return uuid;
+  }
+
+  void addConnection(const std::string& connection_name, const std::string& 
relationship_name, const minifi::utils::Identifier& src_uuid, const 
minifi::utils::Identifier& dst_uuid) {
+    auto connection = std::make_unique<minifi::Connection>(ff_repository_, 
content_repo_, connection_name);
+    connection->setRelationship({relationship_name, "d"});
+    connection->setDestinationUUID(src_uuid);
+    connection->setSourceUUID(dst_uuid);
+    root_->addConnection(std::move(connection));
+  }
+
+  std::unique_ptr<minifi::core::ProcessGroup> root_;
+  std::shared_ptr<minifi::Configure> configuration_;
+  std::shared_ptr<TestRepository> prov_repo_;
+  std::shared_ptr<TestRepository> ff_repository_;
+  std::shared_ptr<minifi::core::ContentRepository> content_repo_;
+  minifi::state::response::ResponseNodeLoader response_node_loader_;
+};
+
+TEST_CASE_METHOD(ResponseNodeLoaderTestFixture, "Load non-existent response 
node", "[responseNodeLoaderTest]") {
+  auto nodes = response_node_loader_.loadResponseNodes("NonExistentNode", 
root_.get());
+  REQUIRE(nodes.empty());
+}
+
+TEST_CASE_METHOD(ResponseNodeLoaderTestFixture, "Load processor metrics node 
not part of the flow config", "[responseNodeLoaderTest]") {
+  auto nodes = response_node_loader_.loadResponseNodes("TailFileMetrics", 
root_.get());
+  REQUIRE(nodes.empty());
+}
+
+TEST_CASE_METHOD(ResponseNodeLoaderTestFixture, "Load system metrics node", 
"[responseNodeLoaderTest]") {
+  auto nodes = response_node_loader_.loadResponseNodes("QueueMetrics", 
root_.get());
+  REQUIRE(nodes.size() == 1);
+  REQUIRE(nodes[0]->getName() == "QueueMetrics");
+}
+
+TEST_CASE_METHOD(ResponseNodeLoaderTestFixture, "Load processor metrics node 
part of the flow config", "[responseNodeLoaderTest]") {
+  auto nodes = 
response_node_loader_.loadResponseNodes("ReadFromFlowFileTestProcessorMetrics", 
root_.get());
+  REQUIRE(nodes.size() == 1);
+  REQUIRE(nodes[0]->getName() == "ReadFromFlowFileTestProcessorMetrics");
+}
+
+TEST_CASE_METHOD(ResponseNodeLoaderTestFixture, "Load multiple processor 
metrics nodes of the same type in a single flow", "[responseNodeLoaderTest]") {
+  auto nodes = 
response_node_loader_.loadResponseNodes("WriteToFlowFileTestProcessorMetrics", 
root_.get());
+  REQUIRE(nodes.size() == 2);
+  REQUIRE(nodes[0]->getName() == "WriteToFlowFileTestProcessorMetrics");
+  REQUIRE(nodes[1]->getName() == "WriteToFlowFileTestProcessorMetrics");
+}
+
+TEST_CASE_METHOD(ResponseNodeLoaderTestFixture, "Use regex to filter processor 
metrics", "[responseNodeLoaderTest]") {
+  SECTION("Load all processor metrics with regex") {
+    auto nodes = 
response_node_loader_.loadResponseNodes("processorMetrics/.*", root_.get());
+    std::unordered_map<std::string, uint32_t> metric_counts;
+    REQUIRE(nodes.size() == 3);
+    for (const auto& node : nodes) {
+      metric_counts[node->getName()]++;
+    }
+    REQUIRE(metric_counts["WriteToFlowFileTestProcessorMetrics"] == 2);
+    REQUIRE(metric_counts["ReadFromFlowFileTestProcessorMetrics"] == 1);
+  }
+
+  SECTION("Filter for a single processor") {
+    auto nodes = 
response_node_loader_.loadResponseNodes("processorMetrics/Read.*", root_.get());
+    REQUIRE(nodes.size() == 1);
+    REQUIRE(nodes[0]->getName() == "ReadFromFlowFileTestProcessorMetrics");
+  }
+
+  SECTION("Full match") {
+    auto nodes = 
response_node_loader_.loadResponseNodes("processorMetrics/ReadFromFlowFileTestProcessorMetrics",
 root_.get());
+    REQUIRE(nodes.size() == 1);
+    REQUIRE(nodes[0]->getName() == "ReadFromFlowFileTestProcessorMetrics");
+  }
+
+  SECTION("No partial match is allowed") {
+    auto nodes = 
response_node_loader_.loadResponseNodes("processorMetrics/Read", root_.get());
+    REQUIRE(nodes.empty());
+  }
+}
+
+}  // namespace org::apache::nifi::minifi::test

Reply via email to