This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 852641804c56e520e5772274437c19fce3953bda
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Wed Nov 9 13:34:48 2022 +0100

    MINIFICPP-1966 Add AgentStatus to Prometheus metrics
    
    AgentsStatus containing agent specific metrics like agent's cpu and
    memory utilization was missing from the prometheus metrics. This adds
    it.
    
    Closes #1438
    Signed-off-by: Marton Szasz <[email protected]>
---
 METRICS.md                                         | 25 +++++++++++++-
 .../test/integration/features/prometheus.feature   |  1 +
 docker/test/integration/minifi/core/ImageStore.py  |  2 +-
 .../integration/minifi/core/PrometheusChecker.py   | 13 +++++++
 .../include/core/state/nodes/AgentInformation.h    | 40 +++++++++++++++++++++-
 .../include/core/state/nodes/ResponseNodeLoader.h  |  1 +
 .../src/core/state/nodes/AgentInformation.cpp      |  1 +
 .../src/core/state/nodes/ResponseNodeLoader.cpp    | 10 ++++++
 8 files changed, 90 insertions(+), 3 deletions(-)

diff --git a/METRICS.md b/METRICS.md
index 543b67f4c..fa0732c06 100644
--- a/METRICS.md
+++ b/METRICS.md
@@ -124,7 +124,7 @@ DeviceInfoNode is a system level metric that reports 
metrics about the system re
 
 ### FlowInformation
 
-DeviceInfoNode is a system level metric that reports metrics about the system 
resources used and available
+FlowInformation is a system level metric that reports component and queue 
related metrics.
 
 | Metric name          | Labels                           | Description        
                        |
 
|----------------------|----------------------------------|--------------------------------------------|
@@ -141,6 +141,29 @@ DeviceInfoNode is a system level metric that reports 
metrics about the system re
 | component_uuid  | UUID of the component                                      
  |
 | component_name  | Name of the component                                      
  |
 
+### AgentStatus
+
+AgentStatus is a system level metric that defines current agent status 
including repository, component and resource usage information.
+
+| Metric name              | Labels                         | Description      
                                                                                
          |
+|--------------------------|--------------------------------|------------------------------------------------------------------------------------------------------------|
+| is_running               | repository_name                | Is the 
repository running (1 or 0)                                                     
                    |
+| is_full                  | repository_name                | Is the 
repository full (1 or 0)                                                        
                    |
+| repository_size          | repository_name                | Current size of 
the repository                                                                  
           |
+| uptime_milliseconds      | -                              | Agent uptime in 
milliseconds                                                                    
           |
+| is_running               | component_uuid, component_name | Check if the 
component is running (1 or 0)                                                   
              |
+| agent_memory_usage_bytes | -                              | Memory used by 
the agent process in bytes                                                      
            |
+| agent_cpu_utilization    | -                              | CPU utilization 
of the agent process (between 0 and 1). In case of a query error the returned 
value is -1. |
+
+| Label           | Description                                              |
+|-----------------|----------------------------------------------------------|
+| repository_name | Name of the reported repository                          |
+| connection_uuid | UUID of the connection defined in the flow configuration |
+| connection_name | Name of the connection defined in the flow configuration |
+| component_uuid  | UUID of the component                                    |
+| component_name  | Name of the component                                    |
+
+
 ## Processor Metrics
 
 Processor level metrics can be accessed for any processor provided by MiNiFi. 
These metrics correspond to the name of the processor appended by the "Metrics" 
suffix (e.g. GetFileMetrics, TailFileMetrics, etc.).
diff --git a/docker/test/integration/features/prometheus.feature 
b/docker/test/integration/features/prometheus.feature
index 8c37a0109..e1af51ede 100644
--- a/docker/test/integration/features/prometheus.feature
+++ b/docker/test/integration/features/prometheus.feature
@@ -16,6 +16,7 @@ Feature: MiNiFi can publish metrics to Prometheus server
     And "PutFileMetrics" processor metric is published to the Prometheus 
server in less than 60 seconds for "PutFile" processor
     And "FlowInformation" is published to the Prometheus server in less than 
60 seconds
     And "DeviceInfoNode" is published to the Prometheus server in less than 60 
seconds
+    And "AgentStatus" is published to the Prometheus server in less than 60 
seconds
 
   Scenario: Multiple GetFile metrics are reported by Prometheus
     Given a GetFile processor with the name "GetFile1" and the "Input 
Directory" property set to "/tmp/input"
diff --git a/docker/test/integration/minifi/core/ImageStore.py 
b/docker/test/integration/minifi/core/ImageStore.py
index 7f45e8273..4076f3031 100644
--- a/docker/test/integration/minifi/core/ImageStore.py
+++ b/docker/test/integration/minifi/core/ImageStore.py
@@ -105,7 +105,7 @@ class ImageStore:
                 RUN echo nifi.metrics.publisher.agent.identifier=Agent1 >> 
{minifi_root}/conf/minifi.properties
                 RUN echo 
nifi.metrics.publisher.class=PrometheusMetricsPublisher >> 
{minifi_root}/conf/minifi.properties
                 RUN echo 
nifi.metrics.publisher.PrometheusMetricsPublisher.port=9936 >> 
{minifi_root}/conf/minifi.properties
-                RUN echo 
nifi.metrics.publisher.metrics=RepositoryMetrics,QueueMetrics,GetFileMetrics,GetTCPMetrics,PutFileMetrics,FlowInformation,DeviceInfoNode
 >> {minifi_root}/conf/minifi.properties
+                RUN echo 
nifi.metrics.publisher.metrics=RepositoryMetrics,QueueMetrics,GetFileMetrics,GetTCPMetrics,PutFileMetrics,FlowInformation,DeviceInfoNode,AgentStatus
 >> {minifi_root}/conf/minifi.properties
                 USER minificpp
                 """.format(base_image='apacheminificpp:' + 
MinifiContainer.MINIFI_VERSION,
                            minifi_root=MinifiContainer.MINIFI_ROOT))
diff --git a/docker/test/integration/minifi/core/PrometheusChecker.py 
b/docker/test/integration/minifi/core/PrometheusChecker.py
index 936ae4178..14dc8eda6 100644
--- a/docker/test/integration/minifi/core/PrometheusChecker.py
+++ b/docker/test/integration/minifi/core/PrometheusChecker.py
@@ -37,6 +37,8 @@ class PrometheusChecker:
             return self.verify_flow_information_metrics()
         elif metric_class == "DeviceInfoNode":
             return self.verify_device_info_node_metrics()
+        elif metric_class == "AgentStatus":
+            return self.verify_agent_status_metrics()
         else:
             raise Exception("Metric class '%s' verification is not 
implemented" % metric_class)
 
@@ -64,6 +66,17 @@ class PrometheusChecker:
     def verify_device_info_node_metrics(self):
         return self.verify_metrics_exist(['minifi_physical_mem', 
'minifi_memory_usage', 'minifi_cpu_utilization'], 'DeviceInfoNode')
 
+    def verify_agent_status_metrics(self):
+        label_list = [{'repository_name': 'provenance'}, {'repository_name': 
'flowfile'}]
+        for labels in label_list:
+            if not (self.verify_metric_exists('minifi_is_running', 
'AgentStatus', labels)
+                    and self.verify_metric_exists('minifi_is_full', 
'AgentStatus', labels)
+                    and self.verify_metric_exists('minifi_repository_size', 
'AgentStatus', labels)):
+                return False
+        return self.verify_metric_exists('minifi_uptime_milliseconds', 
'AgentStatus') and \
+            self.verify_metric_exists('minifi_agent_memory_usage_bytes', 
'AgentStatus') and \
+            self.verify_metric_exists('minifi_agent_cpu_utilization', 
'AgentStatus')
+
     def verify_metric_exists(self, metric_name, metric_class, labels={}):
         labels['metric_class'] = metric_class
         labels['agent_identifier'] = "Agent1"
diff --git a/libminifi/include/core/state/nodes/AgentInformation.h 
b/libminifi/include/core/state/nodes/AgentInformation.h
index 42515d120..827a3e7d6 100644
--- a/libminifi/include/core/state/nodes/AgentInformation.h
+++ b/libminifi/include/core/state/nodes/AgentInformation.h
@@ -418,14 +418,22 @@ class AgentStatus : public StateMonitorNode {
       : StateMonitorNode(name) {
   }
 
+  MINIFIAPI static constexpr const char* Description = "Metric node that 
defines current agent status including repository, component and resource usage 
information.";
+
   std::string getName() const override {
-    return "status";
+    return "AgentStatus";
   }
 
   void setRepositories(const std::map<std::string, 
std::shared_ptr<core::Repository>> &repositories) {
     repositories_ = repositories;
   }
 
+  void addRepository(const std::shared_ptr<core::Repository> &repo) {
+    if (nullptr != repo) {
+      repositories_.insert(std::make_pair(repo->getName(), repo));
+    }
+  }
+
   std::vector<SerializedResponseNode> serialize() override {
     std::vector<SerializedResponseNode> serialized;
     auto serializedRepositories = serializeRepositories();
@@ -444,6 +452,36 @@ class AgentStatus : public StateMonitorNode {
     return serialized;
   }
 
+  std::vector<PublishedMetric> calculateMetrics() override {
+    std::vector<PublishedMetric> metrics;
+    for (const auto& [_, repo] : repositories_) {
+      metrics.push_back({"is_running", (repo->isRunning() ? 1.0 : 0.0), 
{{"metric_class", getName()}, {"repository_name", repo->getName()}}});
+      metrics.push_back({"is_full", (repo->isFull() ? 1.0 : 0.0), 
{{"metric_class", getName()}, {"repository_name", repo->getName()}}});
+      metrics.push_back({"repository_size", 
static_cast<double>(repo->getRepoSize()), {{"metric_class", getName()}, 
{"repository_name", repo->getName()}}});
+    }
+    if (nullptr != monitor_) {
+      auto uptime = monitor_->getUptime();
+      metrics.push_back({"uptime_milliseconds", static_cast<double>(uptime), 
{{"metric_class", getName()}}});
+    }
+
+    if (nullptr != monitor_) {
+      monitor_->executeOnAllComponents([this, &metrics](StateController& 
component){
+        metrics.push_back({"is_running", (component.isRunning() ? 1.0 : 0.0),
+          {{"component_uuid", component.getComponentUUID().to_string()}, 
{"component_name", component.getComponentName()}, {"metric_class", 
getName()}}});
+      });
+    }
+
+    metrics.push_back({"agent_memory_usage_bytes", 
static_cast<double>(utils::OsUtils::getCurrentProcessPhysicalMemoryUsage()), 
{{"metric_class", getName()}}});
+
+    double cpu_usage = -1.0;
+    {
+      std::lock_guard<std::mutex> guard(cpu_load_tracker_mutex_);
+      cpu_usage = cpu_load_tracker_.getCpuUsageAndRestartCollection();
+    }
+    metrics.push_back({"agent_cpu_utilization", cpu_usage, {{"metric_class", 
getName()}}});
+    return metrics;
+  }
+
  protected:
   SerializedResponseNode serializeRepositories() const {
     SerializedResponseNode repositories;
diff --git a/libminifi/include/core/state/nodes/ResponseNodeLoader.h 
b/libminifi/include/core/state/nodes/ResponseNodeLoader.h
index 58d2c8d85..8ad250f9f 100644
--- a/libminifi/include/core/state/nodes/ResponseNodeLoader.h
+++ b/libminifi/include/core/state/nodes/ResponseNodeLoader.h
@@ -52,6 +52,7 @@ class ResponseNodeLoader {
   void initializeAgentIdentifier(const std::shared_ptr<ResponseNode>& 
response_node);
   void initializeAgentMonitor(const std::shared_ptr<ResponseNode>& 
response_node);
   void initializeAgentNode(const std::shared_ptr<ResponseNode>& response_node);
+  void initializeAgentStatus(const std::shared_ptr<ResponseNode>& 
response_node);
   void initializeConfigurationChecksums(const std::shared_ptr<ResponseNode>& 
response_node);
   void initializeFlowMonitor(const std::shared_ptr<ResponseNode>& 
response_node, core::ProcessGroup* root);
 
diff --git a/libminifi/src/core/state/nodes/AgentInformation.cpp 
b/libminifi/src/core/state/nodes/AgentInformation.cpp
index 0609f25ec..7bbf44d3b 100644
--- a/libminifi/src/core/state/nodes/AgentInformation.cpp
+++ b/libminifi/src/core/state/nodes/AgentInformation.cpp
@@ -24,5 +24,6 @@ utils::ProcessCpuUsageTracker AgentStatus::cpu_load_tracker_;
 std::mutex AgentStatus::cpu_load_tracker_mutex_;
 
 REGISTER_RESOURCE(AgentInformation, DescriptionOnly);
+REGISTER_RESOURCE(AgentStatus, DescriptionOnly);
 
 }  // namespace org::apache::nifi::minifi::state::response
diff --git a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp 
b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
index ad1389223..149fe14e1 100644
--- a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
+++ b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
@@ -126,6 +126,15 @@ void ResponseNodeLoader::initializeAgentNode(const 
std::shared_ptr<ResponseNode>
   }
 }
 
+void ResponseNodeLoader::initializeAgentStatus(const 
std::shared_ptr<ResponseNode>& response_node) {
+  auto agent_status = 
dynamic_cast<state::response::AgentStatus*>(response_node.get());
+  if (agent_status != nullptr) {
+    agent_status->addRepository(provenance_repo_);
+    agent_status->addRepository(flow_file_repo_);
+    agent_status->setStateMonitor(update_sink_);
+  }
+}
+
 void ResponseNodeLoader::initializeConfigurationChecksums(const 
std::shared_ptr<ResponseNode>& response_node) {
   auto configuration_checksums = 
dynamic_cast<state::response::ConfigurationChecksums*>(response_node.get());
   if (configuration_checksums) {
@@ -169,6 +178,7 @@ std::vector<std::shared_ptr<ResponseNode>> 
ResponseNodeLoader::loadResponseNodes
     initializeAgentIdentifier(response_node);
     initializeAgentMonitor(response_node);
     initializeAgentNode(response_node);
+    initializeAgentStatus(response_node);
     initializeConfigurationChecksums(response_node);
     initializeFlowMonitor(response_node, root);
   }

Reply via email to