This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 852641804c56e520e5772274437c19fce3953bda Author: Gabor Gyimesi <[email protected]> AuthorDate: Wed Nov 9 13:34:48 2022 +0100 MINIFICPP-1966 Add AgentStatus to Prometheus metrics AgentsStatus containing agent specific metrics like agent's cpu and memory utilization was missing from the prometheus metrics. This adds it. Closes #1438 Signed-off-by: Marton Szasz <[email protected]> --- METRICS.md | 25 +++++++++++++- .../test/integration/features/prometheus.feature | 1 + docker/test/integration/minifi/core/ImageStore.py | 2 +- .../integration/minifi/core/PrometheusChecker.py | 13 +++++++ .../include/core/state/nodes/AgentInformation.h | 40 +++++++++++++++++++++- .../include/core/state/nodes/ResponseNodeLoader.h | 1 + .../src/core/state/nodes/AgentInformation.cpp | 1 + .../src/core/state/nodes/ResponseNodeLoader.cpp | 10 ++++++ 8 files changed, 90 insertions(+), 3 deletions(-) diff --git a/METRICS.md b/METRICS.md index 543b67f4c..fa0732c06 100644 --- a/METRICS.md +++ b/METRICS.md @@ -124,7 +124,7 @@ DeviceInfoNode is a system level metric that reports metrics about the system re ### FlowInformation -DeviceInfoNode is a system level metric that reports metrics about the system resources used and available +FlowInformation is a system level metric that reports component and queue related metrics. | Metric name | Labels | Description | |----------------------|----------------------------------|--------------------------------------------| @@ -141,6 +141,29 @@ DeviceInfoNode is a system level metric that reports metrics about the system re | component_uuid | UUID of the component | | component_name | Name of the component | +### AgentStatus + +AgentStatus is a system level metric that defines current agent status including repository, component and resource usage information. + +| Metric name | Labels | Description | +|--------------------------|--------------------------------|------------------------------------------------------------------------------------------------------------| +| is_running | repository_name | Is the repository running (1 or 0) | +| is_full | repository_name | Is the repository full (1 or 0) | +| repository_size | repository_name | Current size of the repository | +| uptime_milliseconds | - | Agent uptime in milliseconds | +| is_running | component_uuid, component_name | Check if the component is running (1 or 0) | +| agent_memory_usage_bytes | - | Memory used by the agent process in bytes | +| agent_cpu_utilization | - | CPU utilization of the agent process (between 0 and 1). In case of a query error the returned value is -1. | + +| Label | Description | +|-----------------|----------------------------------------------------------| +| repository_name | Name of the reported repository | +| connection_uuid | UUID of the connection defined in the flow configuration | +| connection_name | Name of the connection defined in the flow configuration | +| component_uuid | UUID of the component | +| component_name | Name of the component | + + ## Processor Metrics Processor level metrics can be accessed for any processor provided by MiNiFi. These metrics correspond to the name of the processor appended by the "Metrics" suffix (e.g. GetFileMetrics, TailFileMetrics, etc.). diff --git a/docker/test/integration/features/prometheus.feature b/docker/test/integration/features/prometheus.feature index 8c37a0109..e1af51ede 100644 --- a/docker/test/integration/features/prometheus.feature +++ b/docker/test/integration/features/prometheus.feature @@ -16,6 +16,7 @@ Feature: MiNiFi can publish metrics to Prometheus server And "PutFileMetrics" processor metric is published to the Prometheus server in less than 60 seconds for "PutFile" processor And "FlowInformation" is published to the Prometheus server in less than 60 seconds And "DeviceInfoNode" is published to the Prometheus server in less than 60 seconds + And "AgentStatus" is published to the Prometheus server in less than 60 seconds Scenario: Multiple GetFile metrics are reported by Prometheus Given a GetFile processor with the name "GetFile1" and the "Input Directory" property set to "/tmp/input" diff --git a/docker/test/integration/minifi/core/ImageStore.py b/docker/test/integration/minifi/core/ImageStore.py index 7f45e8273..4076f3031 100644 --- a/docker/test/integration/minifi/core/ImageStore.py +++ b/docker/test/integration/minifi/core/ImageStore.py @@ -105,7 +105,7 @@ class ImageStore: RUN echo nifi.metrics.publisher.agent.identifier=Agent1 >> {minifi_root}/conf/minifi.properties RUN echo nifi.metrics.publisher.class=PrometheusMetricsPublisher >> {minifi_root}/conf/minifi.properties RUN echo nifi.metrics.publisher.PrometheusMetricsPublisher.port=9936 >> {minifi_root}/conf/minifi.properties - RUN echo nifi.metrics.publisher.metrics=RepositoryMetrics,QueueMetrics,GetFileMetrics,GetTCPMetrics,PutFileMetrics,FlowInformation,DeviceInfoNode >> {minifi_root}/conf/minifi.properties + RUN echo nifi.metrics.publisher.metrics=RepositoryMetrics,QueueMetrics,GetFileMetrics,GetTCPMetrics,PutFileMetrics,FlowInformation,DeviceInfoNode,AgentStatus >> {minifi_root}/conf/minifi.properties USER minificpp """.format(base_image='apacheminificpp:' + MinifiContainer.MINIFI_VERSION, minifi_root=MinifiContainer.MINIFI_ROOT)) diff --git a/docker/test/integration/minifi/core/PrometheusChecker.py b/docker/test/integration/minifi/core/PrometheusChecker.py index 936ae4178..14dc8eda6 100644 --- a/docker/test/integration/minifi/core/PrometheusChecker.py +++ b/docker/test/integration/minifi/core/PrometheusChecker.py @@ -37,6 +37,8 @@ class PrometheusChecker: return self.verify_flow_information_metrics() elif metric_class == "DeviceInfoNode": return self.verify_device_info_node_metrics() + elif metric_class == "AgentStatus": + return self.verify_agent_status_metrics() else: raise Exception("Metric class '%s' verification is not implemented" % metric_class) @@ -64,6 +66,17 @@ class PrometheusChecker: def verify_device_info_node_metrics(self): return self.verify_metrics_exist(['minifi_physical_mem', 'minifi_memory_usage', 'minifi_cpu_utilization'], 'DeviceInfoNode') + def verify_agent_status_metrics(self): + label_list = [{'repository_name': 'provenance'}, {'repository_name': 'flowfile'}] + for labels in label_list: + if not (self.verify_metric_exists('minifi_is_running', 'AgentStatus', labels) + and self.verify_metric_exists('minifi_is_full', 'AgentStatus', labels) + and self.verify_metric_exists('minifi_repository_size', 'AgentStatus', labels)): + return False + return self.verify_metric_exists('minifi_uptime_milliseconds', 'AgentStatus') and \ + self.verify_metric_exists('minifi_agent_memory_usage_bytes', 'AgentStatus') and \ + self.verify_metric_exists('minifi_agent_cpu_utilization', 'AgentStatus') + def verify_metric_exists(self, metric_name, metric_class, labels={}): labels['metric_class'] = metric_class labels['agent_identifier'] = "Agent1" diff --git a/libminifi/include/core/state/nodes/AgentInformation.h b/libminifi/include/core/state/nodes/AgentInformation.h index 42515d120..827a3e7d6 100644 --- a/libminifi/include/core/state/nodes/AgentInformation.h +++ b/libminifi/include/core/state/nodes/AgentInformation.h @@ -418,14 +418,22 @@ class AgentStatus : public StateMonitorNode { : StateMonitorNode(name) { } + MINIFIAPI static constexpr const char* Description = "Metric node that defines current agent status including repository, component and resource usage information."; + std::string getName() const override { - return "status"; + return "AgentStatus"; } void setRepositories(const std::map<std::string, std::shared_ptr<core::Repository>> &repositories) { repositories_ = repositories; } + void addRepository(const std::shared_ptr<core::Repository> &repo) { + if (nullptr != repo) { + repositories_.insert(std::make_pair(repo->getName(), repo)); + } + } + std::vector<SerializedResponseNode> serialize() override { std::vector<SerializedResponseNode> serialized; auto serializedRepositories = serializeRepositories(); @@ -444,6 +452,36 @@ class AgentStatus : public StateMonitorNode { return serialized; } + std::vector<PublishedMetric> calculateMetrics() override { + std::vector<PublishedMetric> metrics; + for (const auto& [_, repo] : repositories_) { + metrics.push_back({"is_running", (repo->isRunning() ? 1.0 : 0.0), {{"metric_class", getName()}, {"repository_name", repo->getName()}}}); + metrics.push_back({"is_full", (repo->isFull() ? 1.0 : 0.0), {{"metric_class", getName()}, {"repository_name", repo->getName()}}}); + metrics.push_back({"repository_size", static_cast<double>(repo->getRepoSize()), {{"metric_class", getName()}, {"repository_name", repo->getName()}}}); + } + if (nullptr != monitor_) { + auto uptime = monitor_->getUptime(); + metrics.push_back({"uptime_milliseconds", static_cast<double>(uptime), {{"metric_class", getName()}}}); + } + + if (nullptr != monitor_) { + monitor_->executeOnAllComponents([this, &metrics](StateController& component){ + metrics.push_back({"is_running", (component.isRunning() ? 1.0 : 0.0), + {{"component_uuid", component.getComponentUUID().to_string()}, {"component_name", component.getComponentName()}, {"metric_class", getName()}}}); + }); + } + + metrics.push_back({"agent_memory_usage_bytes", static_cast<double>(utils::OsUtils::getCurrentProcessPhysicalMemoryUsage()), {{"metric_class", getName()}}}); + + double cpu_usage = -1.0; + { + std::lock_guard<std::mutex> guard(cpu_load_tracker_mutex_); + cpu_usage = cpu_load_tracker_.getCpuUsageAndRestartCollection(); + } + metrics.push_back({"agent_cpu_utilization", cpu_usage, {{"metric_class", getName()}}}); + return metrics; + } + protected: SerializedResponseNode serializeRepositories() const { SerializedResponseNode repositories; diff --git a/libminifi/include/core/state/nodes/ResponseNodeLoader.h b/libminifi/include/core/state/nodes/ResponseNodeLoader.h index 58d2c8d85..8ad250f9f 100644 --- a/libminifi/include/core/state/nodes/ResponseNodeLoader.h +++ b/libminifi/include/core/state/nodes/ResponseNodeLoader.h @@ -52,6 +52,7 @@ class ResponseNodeLoader { void initializeAgentIdentifier(const std::shared_ptr<ResponseNode>& response_node); void initializeAgentMonitor(const std::shared_ptr<ResponseNode>& response_node); void initializeAgentNode(const std::shared_ptr<ResponseNode>& response_node); + void initializeAgentStatus(const std::shared_ptr<ResponseNode>& response_node); void initializeConfigurationChecksums(const std::shared_ptr<ResponseNode>& response_node); void initializeFlowMonitor(const std::shared_ptr<ResponseNode>& response_node, core::ProcessGroup* root); diff --git a/libminifi/src/core/state/nodes/AgentInformation.cpp b/libminifi/src/core/state/nodes/AgentInformation.cpp index 0609f25ec..7bbf44d3b 100644 --- a/libminifi/src/core/state/nodes/AgentInformation.cpp +++ b/libminifi/src/core/state/nodes/AgentInformation.cpp @@ -24,5 +24,6 @@ utils::ProcessCpuUsageTracker AgentStatus::cpu_load_tracker_; std::mutex AgentStatus::cpu_load_tracker_mutex_; REGISTER_RESOURCE(AgentInformation, DescriptionOnly); +REGISTER_RESOURCE(AgentStatus, DescriptionOnly); } // namespace org::apache::nifi::minifi::state::response diff --git a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp index ad1389223..149fe14e1 100644 --- a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp +++ b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp @@ -126,6 +126,15 @@ void ResponseNodeLoader::initializeAgentNode(const std::shared_ptr<ResponseNode> } } +void ResponseNodeLoader::initializeAgentStatus(const std::shared_ptr<ResponseNode>& response_node) { + auto agent_status = dynamic_cast<state::response::AgentStatus*>(response_node.get()); + if (agent_status != nullptr) { + agent_status->addRepository(provenance_repo_); + agent_status->addRepository(flow_file_repo_); + agent_status->setStateMonitor(update_sink_); + } +} + void ResponseNodeLoader::initializeConfigurationChecksums(const std::shared_ptr<ResponseNode>& response_node) { auto configuration_checksums = dynamic_cast<state::response::ConfigurationChecksums*>(response_node.get()); if (configuration_checksums) { @@ -169,6 +178,7 @@ std::vector<std::shared_ptr<ResponseNode>> ResponseNodeLoader::loadResponseNodes initializeAgentIdentifier(response_node); initializeAgentMonitor(response_node); initializeAgentNode(response_node); + initializeAgentStatus(response_node); initializeConfigurationChecksums(response_node); initializeFlowMonitor(response_node, root); }
