This is an automated email from the ASF dual-hosted git repository.
kdoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 99d6ed2 NIFI-6352: Add ability to report all component metrics to
Prometheus
99d6ed2 is described below
commit 99d6ed244cd8808646eaa9fbf446e4be36defafc
Author: Matthew Burgess <[email protected]>
AuthorDate: Wed Jun 5 12:32:22 2019 -0400
NIFI-6352: Add ability to report all component metrics to Prometheus
This closes #3519.
Signed-off-by: Kevin Doran <[email protected]>
---
.../prometheus/PrometheusReportingTask.java | 32 +-
.../reporting/prometheus/PrometheusServer.java | 11 +-
.../prometheus/api/PrometheusMetricsUtil.java | 330 ++++++++++++++++-----
.../prometheus/TestPrometheusReportingTask.java | 4 +-
4 files changed, 296 insertions(+), 81 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java
b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java
index daaddd3..7dcfe73 100644
---
a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java
+++
b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusReportingTask.java
@@ -40,6 +40,10 @@ import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
import org.eclipse.jetty.server.Server;
+import static
org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil.METRICS_STRATEGY_COMPONENTS;
+import static
org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil.METRICS_STRATEGY_PG;
+import static
org.apache.nifi.reporting.prometheus.api.PrometheusMetricsUtil.METRICS_STRATEGY_ROOT;
+
@Tags({ "reporting", "prometheus", "metrics", "time series data" })
@CapabilityDescription("Reports metrics in Prometheus format by creating
/metrics http endpoint which can be used for external monitoring of the
application."
+ " The reporting task reports a set of metrics regarding the JVM
(optional) and the NiFi instance")
@@ -59,7 +63,8 @@ public class PrometheusReportingTask extends
AbstractReportingTask {
+ "specified in the SSL Context Service");
public static final PropertyDescriptor METRICS_ENDPOINT_PORT = new
PropertyDescriptor.Builder()
- .name("Prometheus Metrics Endpoint Port")
+ .name("prometheus-reporting-task-metrics-endpoint-port")
+ .displayName("Prometheus Metrics Endpoint Port")
.description("The Port where prometheus metrics can be accessed")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
@@ -68,7 +73,8 @@ public class PrometheusReportingTask extends
AbstractReportingTask {
.build();
public static final PropertyDescriptor INSTANCE_ID = new
PropertyDescriptor.Builder()
- .name("Instance ID")
+ .name("prometheus-reporting-task-instance-id")
+ .displayName("Instance ID")
.description("Id of this NiFi instance to be included in the
metrics sent to Prometheus")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
@@ -76,16 +82,27 @@ public class PrometheusReportingTask extends
AbstractReportingTask {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
+ public static final PropertyDescriptor METRICS_STRATEGY = new
PropertyDescriptor.Builder()
+ .name("prometheus-reporting-task-metrics-strategy")
+ .displayName("Metrics Reporting Strategy")
+ .description("The granularity on which to report metrics. Options
include only the root process group, all process groups, or all components")
+ .allowableValues(METRICS_STRATEGY_ROOT, METRICS_STRATEGY_PG,
METRICS_STRATEGY_COMPONENTS)
+ .defaultValue(METRICS_STRATEGY_COMPONENTS.getValue())
+ .required(true)
+ .build();
+
public static final PropertyDescriptor SEND_JVM_METRICS = new
PropertyDescriptor.Builder()
- .name("Send JVM-metrics")
- .description("Send JVM-metrics in addition to the Nifi-metrics")
+ .name("prometheus-reporting-task-metrics-send-jvm")
+ .displayName("Send JVM metrics")
+ .description("Send JVM metrics in addition to the NiFi metrics")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
public static final PropertyDescriptor SSL_CONTEXT = new
PropertyDescriptor.Builder()
- .name("SSL Context Service")
+ .name("prometheus-reporting-task-ssl-context")
+ .displayName("SSL Context Service")
.description("The SSL Context Service to use in order to secure
the server. If specified, the server will"
+ "accept only HTTPS requests; otherwise, the server will
accept only HTTP requests")
.required(false)
@@ -93,7 +110,8 @@ public class PrometheusReportingTask extends
AbstractReportingTask {
.build();
public static final PropertyDescriptor CLIENT_AUTH = new
PropertyDescriptor.Builder()
- .name("Client Authentication")
+ .name("prometheus-reporting-task-client-auth")
+ .displayName("Client Authentication")
.description("Specifies whether or not the Reporting Task should
authenticate clients. This value is ignored if the <SSL Context Service> "
+ "Property is not specified or the SSL Context provided
uses only a KeyStore and not a TrustStore.")
.required(true)
@@ -107,6 +125,7 @@ public class PrometheusReportingTask extends
AbstractReportingTask {
List<PropertyDescriptor> props = new ArrayList<>();
props.add(METRICS_ENDPOINT_PORT);
props.add(INSTANCE_ID);
+ props.add(METRICS_STRATEGY);
props.add(SEND_JVM_METRICS);
props.add(SSL_CONTEXT);
props.add(CLIENT_AUTH);
@@ -165,5 +184,6 @@ public class PrometheusReportingTask extends
AbstractReportingTask {
@Override
public void onTrigger(final ReportingContext context) {
this.prometheusServer.setReportingContext(context);
+
this.prometheusServer.setMetricsStrategy(context.getProperty(METRICS_STRATEGY).getValue());
}
}
diff --git
a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusServer.java
b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusServer.java
index acaa447..76ef126 100644
---
a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusServer.java
+++
b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/PrometheusServer.java
@@ -54,6 +54,7 @@ public class PrometheusServer {
private Server server;
private ServletContextHandler handler;
private ReportingContext context;
+ private String metricsStrategy;
private boolean sendJvmMetrics;
private String instanceId;
@@ -70,7 +71,8 @@ public class PrometheusServer {
rootGroupStatus =
PrometheusServer.this.context.getEventAccess().getControllerStatus();
ServletOutputStream response = resp.getOutputStream();
OutputStreamWriter osw = new OutputStreamWriter(response);
- nifiRegistry =
PrometheusMetricsUtil.createNifiMetrics(rootGroupStatus,
PrometheusServer.this.instanceId);
+
+ nifiRegistry =
PrometheusMetricsUtil.createNifiMetrics(rootGroupStatus,
PrometheusServer.this.instanceId, "", "RootProcessGroup", metricsStrategy);
TextFormat.write004(osw, nifiRegistry.metricFamilySamples());
if (PrometheusServer.this.sendJvmMetrics == true) {
@@ -167,4 +169,11 @@ public class PrometheusServer {
this.instanceId = iid;
}
+ public String getMetricsStrategy() {
+ return metricsStrategy;
+ }
+
+ public void setMetricsStrategy(String metricsStrategy) {
+ this.metricsStrategy = metricsStrategy;
+ }
}
diff --git
a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java
b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java
index ebd0102..f4fa642 100644
---
a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java
+++
b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/main/java/org/apache/nifi/reporting/prometheus/api/PrometheusMetricsUtil.java
@@ -17,9 +17,11 @@
package org.apache.nifi.reporting.prometheus.api;
-import java.util.Collection;
import java.util.Map;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
@@ -27,100 +29,116 @@ import com.yammer.metrics.core.VirtualMachineMetrics;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Gauge;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.controller.status.TransmissionStatus;
public class PrometheusMetricsUtil {
+
+ public static final AllowableValue METRICS_STRATEGY_ROOT = new
AllowableValue("Root Process Group", "Root Process Group",
+ "Send rollup metrics for the entire root process group");
+ public static final AllowableValue METRICS_STRATEGY_PG = new
AllowableValue("All Process Groups", "All Process Groups",
+ "Send metrics for each process group");
+ public static final AllowableValue METRICS_STRATEGY_COMPONENTS = new
AllowableValue("All Components", "All Components",
+ "Send metrics for each component in the system, to include
processors, connections, controller services, etc.");
+
private static final CollectorRegistry NIFI_REGISTRY = new
CollectorRegistry();
private static final CollectorRegistry JVM_REGISTRY = new
CollectorRegistry();
+ // Process Group metrics
private static final Gauge AMOUNT_FLOWFILES_SENT = Gauge.build()
- .name("nifi_process_group_amount_flowfiles_sent")
- .help("Total number of FlowFiles in ProcessGroup sent")
- .labelNames("instance", "process_group_name", "process_group_id")
+ .name("nifi_amount_flowfiles_sent")
+ .help("Total number of FlowFiles sent by the component")
+ .labelNames("instance", "component_type", "component_name",
"component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_FLOWFILES_TRANSFERRED = Gauge.build()
- .name("nifi_process_group_amount_flowfiles_transferred")
- .help("Total number of FlowFiles in ProcessGroup transferred")
- .labelNames("instance", "process_group_name", "process_group_id")
+ .name("nifi_amount_flowfiles_transferred")
+ .help("Total number of FlowFiles transferred by the component")
+ .labelNames("instance", "component_type", "component_name",
"component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_FLOWFILES_RECEIVED = Gauge.build()
- .name("nifi_process_group_amount_flowfiles_received")
- .help("Total number of FlowFiles in ProcessGroup received")
- .labelNames("instance", "process_group_name", "process_group_id")
+ .name("nifi_amount_flowfiles_received")
+ .help("Total number of FlowFiles received by the component")
+ .labelNames("instance", "component_type", "component_name",
"component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_BYTES_SENT = Gauge.build()
- .name("nifi_process_group_amount_bytes_sent")
- .help("Total number of Bytes in ProcessGroup sent")
- .labelNames("instance", "process_group_name", "process_group_id")
+ .name("nifi_amount_bytes_sent")
+ .help("Total number of bytes sent by the component")
+ .labelNames("instance", "component_type", "component_name",
"component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_BYTES_READ = Gauge.build()
- .name("nifi_process_group_amount_bytes_read")
- .help("Total number of Bytes in ProcessGroup read")
- .labelNames("instance", "process_group_name", "process_group_id")
+ .name("nifi_amount_bytes_read")
+ .help("Total number of bytes read by the component")
+ .labelNames("instance", "component_type", "component_name",
"component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_BYTES_WRITTEN = Gauge.build()
- .name("nifi_process_group_amount_bytes_written")
- .help("Total number of Bytes in ProcessGroup written")
- .labelNames("instance", "process_group_name", "process_group_id")
+ .name("nifi_amount_bytes_written")
+ .help("Total number of bytes written by the component")
+ .labelNames("instance", "component_type", "component_name",
"component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_BYTES_RECEIVED = Gauge.build()
- .name("nifi_process_group_amount_bytes_received")
- .help("Total number of Bytes in ProcessGroup received")
- .labelNames("instance", "process_group_name", "process_group_id")
+ .name("nifi_amount_bytes_received")
+ .help("Total number of bytes received by the component")
+ .labelNames("instance", "component_type", "component_name",
"component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_BYTES_TRANSFERRED = Gauge.build()
- .name("nifi_process_group_amount_bytes_transferred")
- .help("Total number of Bytes in ProcessGroup transferred")
- .labelNames("instance", "process_group_name", "process_group_id")
+ .name("nifi_amount_bytes_transferred")
+ .help("Total number of Bytes transferred by the component")
+ .labelNames("instance", "component_type", "component_name",
"component_id", "parent_id")
.register(NIFI_REGISTRY);
-
private static final Gauge AMOUNT_THREADS_TOTAL_ACTIVE = Gauge.build()
- .name("nifi_process_group_amount_threads_active")
- .help("Total number of threads in ProcessGroup active")
- .labelNames("instance", "process_group_name", "process_group_id")
+ .name("nifi_amount_threads_active")
+ .help("Total number of threads active for the component")
+ .labelNames("instance", "component_type", "component_name",
"component_id", "parent_id")
.register(NIFI_REGISTRY);
private static final Gauge SIZE_CONTENT_OUTPUT_TOTAL = Gauge.build()
- .name("nifi_process_group_size_content_output_total")
- .help("Total size of content output in ProcessGroup")
- .labelNames("instance", "process_group_name", "process_group_id")
+ .name("nifi_size_content_output_total")
+ .help("Total size of content output by the component")
+ .labelNames("instance", "component_type", "component_name",
"component_id", "parent_id",
+ "source_id", "source_name", "destination_id",
"destination_name")
.register(NIFI_REGISTRY);
private static final Gauge SIZE_CONTENT_INPUT_TOTAL = Gauge.build()
- .name("nifi_process_group_size_content_input_total")
- .help("Total size of content input in ProcessGroup")
- .labelNames("instance", "process_group_name", "process_group_id")
+ .name("nifi_size_content_input_total")
+ .help("Total size of content input by the component")
+ .labelNames("instance", "component_type", "component_name",
"component_id", "parent_id",
+ "source_id", "source_name", "destination_id",
"destination_name")
.register(NIFI_REGISTRY);
private static final Gauge SIZE_CONTENT_QUEUED_TOTAL = Gauge.build()
- .name("nifi_process_group_size_content_queued_total")
- .help("Total size of content queued in ProcessGroup")
- .labelNames("instance", "process_group_name", "process_group_id")
+ .name("nifi_size_content_queued_total")
+ .help("Total size of content queued in the component")
+ .labelNames("instance", "component_type", "component_name",
"component_id", "parent_id",
+ "source_id", "source_name", "destination_id",
"destination_name")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_ITEMS_OUTPUT = Gauge.build()
- .name("nifi_process_group_amount_items_output")
- .help("Total amount of items in ProcessGroup output")
- .labelNames("instance", "process_group_name", "process_group_id")
+ .name("nifi_amount_items_output")
+ .help("Total number of items output by the component")
+ .labelNames("instance", "component_type", "component_name",
"component_id", "parent_id",
+ "source_id", "source_name", "destination_id",
"destination_name")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_ITEMS_INPUT = Gauge.build()
- .name("nifi_process_group_amount_items_input")
- .help("Total amount of items in ProcessGroup input")
- .labelNames("instance", "process_group_name", "process_group_id")
+ .name("nifi_amount_items_input")
+ .help("Total number of items input by the component")
+ .labelNames("instance", "component_type", "component_name",
"component_id", "parent_id",
+ "source_id", "source_name", "destination_id",
"destination_name")
.register(NIFI_REGISTRY);
private static final Gauge AMOUNT_ITEMS_QUEUED = Gauge.build()
- .name("nifi_process_group_amount_items_queued")
- .help("Total amount of items in ProcessGroup queued")
- .labelNames("instance", "process_group_name", "process_group_id")
+ .name("nifi_amount_items_queued")
+ .help("Total number of items queued by the component")
+ .labelNames("instance", "component_type", "component_name",
"component_id", "parent_id",
+ "source_id", "source_name", "destination_id",
"destination_name")
.register(NIFI_REGISTRY);
private static final Gauge PROCESSOR_COUNTERS = Gauge.build()
@@ -129,6 +147,60 @@ public class PrometheusMetricsUtil {
.labelNames("processor_name", "counter_name", "processor_id",
"instance")
.register(NIFI_REGISTRY);
+ // Connection metrics
+ private static final Gauge BACKPRESSURE_BYTES_THRESHOLD = Gauge.build()
+ .name("nifi_backpressure_bytes_threshold")
+ .help("The number of bytes that can be queued before backpressure
is applied")
+ .labelNames("instance", "component_type", "component_name",
"component_id", "parent_id",
+ "source_id", "source_name", "destination_id",
"destination_name")
+ .register(NIFI_REGISTRY);
+
+ private static final Gauge BACKPRESSURE_OBJECT_THRESHOLD = Gauge.build()
+ .name("nifi_backpressure_object_threshold")
+ .help("The number of flow files that can be queued before
backpressure is applied")
+ .labelNames("instance", "component_type", "component_name",
"component_id", "parent_id",
+ "source_id", "source_name", "destination_id",
"destination_name")
+ .register(NIFI_REGISTRY);
+
+ private static final Gauge IS_BACKPRESSURE_ENABLED = Gauge.build()
+ .name("nifi_backpressure_enabled")
+ .help("Whether backpressure has been applied for this component.
Values are 0 or 1")
+ .labelNames("instance", "component_type", "component_name",
"component_id", "parent_id",
+ "source_id", "source_name", "destination_id",
"destination_name")
+ .register(NIFI_REGISTRY);
+
+ // Port metrics
+ private static final Gauge IS_TRANSMITTING = Gauge.build()
+ .name("nifi_transmitting")
+ .help("Whether this component is transmitting data. Values are 0
or 1")
+ .labelNames("instance", "component_type", "component_name",
"component_id", "parent_id", "run_status")
+ .register(NIFI_REGISTRY);
+
+ // Remote Process Group (RPG) metrics
+ private static final Gauge ACTIVE_REMOTE_PORT_COUNT = Gauge.build()
+ .name("nifi_active_remote_port_count")
+ .help("The number of active remote ports associated with this
component")
+ .labelNames("instance", "component_type", "component_name",
"component_id", "parent_id",
+ "source_id", "source_name", "destination_id",
"destination_name")
+ .register(NIFI_REGISTRY);
+
+ private static final Gauge INACTIVE_REMOTE_PORT_COUNT = Gauge.build()
+ .name("nifi_inactive_remote_port_count")
+ .help("The number of inactive remote ports associated with this
component")
+ .labelNames("instance", "component_type", "component_name",
"component_id", "parent_id",
+ "source_id", "source_name", "destination_id",
"destination_name")
+ .register(NIFI_REGISTRY);
+
+ private static final Gauge AVERAGE_LINEAGE_DURATION = Gauge.build()
+ .name("nifi_average_lineage_duration")
+ .help("The average lineage duration (in milliseconds) for all flow
file processed by this component")
+ .labelNames("instance", "component_type", "component_name",
"component_id", "parent_id",
+ "source_id", "source_name", "destination_id",
"destination_name")
+ .register(NIFI_REGISTRY);
+
+ ///////////////////////////////////////////////////////////////
+ // JVM Metrics
+ ///////////////////////////////////////////////////////////////
private static final Gauge JVM_HEAP_USED = Gauge.build()
.name("nifi_jvm_heap_used")
.help("NiFi JVM heap used")
@@ -171,45 +243,159 @@ public class PrometheusMetricsUtil {
.labelNames("instance")
.register(JVM_REGISTRY);
- public static CollectorRegistry createNifiMetrics(ProcessGroupStatus
status, String instanceId) {
+ public static CollectorRegistry createNifiMetrics(ProcessGroupStatus
status, String instanceId, String parentPGId, String componentType, String
metricsStrategy) {
+
+ final String componentId = status.getId();
+ final String componentName = status.getName();
+
+ AMOUNT_FLOWFILES_SENT.labels(instanceId, componentType, componentName,
componentId, parentPGId).set(status.getFlowFilesSent());
+ AMOUNT_FLOWFILES_TRANSFERRED.labels(instanceId, componentType,
componentName, componentId, parentPGId).set(status.getFlowFilesTransferred());
+ AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, componentType,
componentName, componentId, parentPGId).set(status.getFlowFilesReceived());
+
+ AMOUNT_BYTES_SENT.labels(instanceId, componentType, componentName,
componentId, parentPGId).set(status.getBytesSent());
+ AMOUNT_BYTES_READ.labels(instanceId, componentType, componentName,
componentId, parentPGId).set(status.getBytesRead());
+ AMOUNT_BYTES_WRITTEN.labels(instanceId, componentType, componentName,
componentId, parentPGId).set(status.getBytesWritten());
+ AMOUNT_BYTES_RECEIVED.labels(instanceId, componentType, componentName,
componentId, parentPGId).set(status.getBytesReceived());
+ AMOUNT_BYTES_TRANSFERRED.labels(instanceId, componentType,
componentName, componentId, parentPGId).set(status.getBytesTransferred());
+
+ SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, componentType,
componentName, componentId, parentPGId, "", "", "", "")
+ .set(status.getOutputContentSize());
+ SIZE_CONTENT_INPUT_TOTAL.labels(instanceId, componentType,
componentName, componentId, parentPGId, "", "", "", "")
+ .set(status.getInputContentSize());
+ SIZE_CONTENT_QUEUED_TOTAL.labels(instanceId, componentType,
componentName, componentId, parentPGId, "", "", "", "")
+ .set(status.getQueuedContentSize());
- final String processGroupId = status.getId();
- final String processGroupName = status.getName();
- Collection<ProcessorStatus> processorStatus =
status.getProcessorStatus();
+ AMOUNT_ITEMS_OUTPUT.labels(instanceId, componentType, componentName,
componentId, parentPGId, "", "", "", "")
+ .set(status.getOutputCount());
+ AMOUNT_ITEMS_INPUT.labels(instanceId, componentType, componentName,
componentId, parentPGId, "", "", "", "")
+ .set(status.getInputCount());
+ AMOUNT_ITEMS_QUEUED.labels(instanceId, componentType, componentName,
componentId, parentPGId,"", "", "", "")
+ .set(status.getQueuedCount());
- AMOUNT_FLOWFILES_SENT.labels(instanceId, processGroupName,
processGroupId).set(status.getFlowFilesSent());
- AMOUNT_FLOWFILES_TRANSFERRED.labels(instanceId, processGroupName,
processGroupId).set(status.getFlowFilesTransferred());
- AMOUNT_FLOWFILES_RECEIVED.labels(instanceId, processGroupName,
processGroupId).set(status.getFlowFilesReceived());
+ AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, componentType,
componentName, componentId, parentPGId).set(status.getActiveThreadCount());
- AMOUNT_BYTES_SENT.labels(instanceId, processGroupName,
processGroupId).set(status.getBytesSent());
- AMOUNT_BYTES_READ.labels(instanceId, processGroupName,
processGroupId).set(status.getBytesRead());
- AMOUNT_BYTES_WRITTEN.labels(instanceId, processGroupName,
processGroupId).set(status.getBytesWritten());
- AMOUNT_BYTES_RECEIVED.labels(instanceId, processGroupName,
processGroupId).set(status.getBytesReceived());
- AMOUNT_BYTES_TRANSFERRED.labels(instanceId, processGroupName,
processGroupId).set(status.getBytesTransferred());
+ // Report metrics for child process groups if specified
+ if (METRICS_STRATEGY_PG.getValue().equals(metricsStrategy) ||
METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) {
+ status.getProcessGroupStatus().forEach((childGroupStatus) ->
createNifiMetrics(childGroupStatus, instanceId, parentPGId, "ProcessGroup",
metricsStrategy));
+ }
- SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId, processGroupName,
processGroupId).set(status.getOutputContentSize());
- SIZE_CONTENT_INPUT_TOTAL.labels(instanceId, processGroupName,
processGroupId).set(status.getInputContentSize());
- SIZE_CONTENT_QUEUED_TOTAL.labels(instanceId, processGroupName,
processGroupId).set(status.getQueuedContentSize());
+ if (METRICS_STRATEGY_COMPONENTS.getValue().equals(metricsStrategy)) {
+ // Report metrics for all components
+ for(ProcessorStatus processorStatus : status.getProcessorStatus())
{
+ Map<String, Long> counters = processorStatus.getCounters();
- AMOUNT_ITEMS_OUTPUT.labels(instanceId, processGroupName,
processGroupId).set(status.getOutputCount());
- AMOUNT_ITEMS_INPUT.labels(instanceId, processGroupName,
processGroupId).set(status.getInputCount());
- AMOUNT_ITEMS_QUEUED.labels(instanceId, processGroupName,
processGroupId).set(status.getQueuedCount());
+ if(counters != null) {
+ counters.entrySet().stream().forEach(entry ->
PROCESSOR_COUNTERS
+ .labels(processorStatus.getName(), entry.getKey(),
processorStatus.getId(), instanceId).set(entry.getValue()));
+ }
+ }
+ for(ConnectionStatus connectionStatus :
status.getConnectionStatus()) {
+ final String connComponentId = connectionStatus.getId();
+ final String connComponentName = connectionStatus.getName();
+ final String sourceId = connectionStatus.getSourceId();
+ final String sourceName = connectionStatus.getSourceName();
+ final String destinationId =
connectionStatus.getDestinationId();
+ final String destinationName =
connectionStatus.getDestinationName();
+ final String parentId = connectionStatus.getGroupId();
+ final String connComponentType = "Connection";
+ SIZE_CONTENT_OUTPUT_TOTAL.labels(instanceId,
connComponentType, connComponentName, connComponentId, parentId, sourceId,
sourceName, destinationId, destinationName)
+ .set(connectionStatus.getOutputBytes());
+ SIZE_CONTENT_INPUT_TOTAL.labels(instanceId, connComponentType,
connComponentName, connComponentId, parentId, sourceId, sourceName,
destinationId, destinationName)
+ .set(connectionStatus.getInputBytes());
+ SIZE_CONTENT_QUEUED_TOTAL.labels(instanceId,
connComponentType, connComponentName, connComponentId, parentId, sourceId,
sourceName, destinationId, destinationName)
+ .set(connectionStatus.getQueuedBytes());
+
+ AMOUNT_ITEMS_OUTPUT.labels(instanceId, connComponentType,
connComponentName, connComponentId, parentId, sourceId, sourceName,
destinationId, destinationName)
+ .set(connectionStatus.getOutputCount());
+ AMOUNT_ITEMS_INPUT.labels(instanceId, connComponentType,
connComponentName, connComponentId, parentId, sourceId, sourceName,
destinationId, destinationName)
+ .set(connectionStatus.getInputCount());
+ AMOUNT_ITEMS_QUEUED.labels(instanceId, connComponentType,
connComponentName, connComponentId, parentId, sourceId, sourceName,
destinationId, destinationName)
+ .set(connectionStatus.getQueuedCount());
+
+ BACKPRESSURE_BYTES_THRESHOLD.labels(instanceId,
connComponentType, connComponentName, connComponentId, parentId, sourceId,
sourceName, destinationId, destinationName)
+ .set(connectionStatus.getBackPressureBytesThreshold());
+ BACKPRESSURE_OBJECT_THRESHOLD.labels(instanceId,
connComponentType, connComponentName, connComponentId, parentId, sourceId,
sourceName, destinationId, destinationName)
+
.set(connectionStatus.getBackPressureObjectThreshold());
+ boolean isBackpressureEnabled =
(connectionStatus.getBackPressureObjectThreshold() > 0 &&
connectionStatus.getBackPressureObjectThreshold() <=
connectionStatus.getQueuedCount())
+ || (connectionStatus.getBackPressureBytesThreshold() >
0 && connectionStatus.getBackPressureBytesThreshold() <=
connectionStatus.getMaxQueuedBytes());
+ IS_BACKPRESSURE_ENABLED.labels(instanceId, connComponentType,
connComponentName, connComponentId, parentId, sourceId, sourceName,
destinationId, destinationName)
+ .set(isBackpressureEnabled ? 1 : 0);
+ }
+ for(PortStatus portStatus : status.getInputPortStatus()) {
+ final String portComponentId = portStatus.getId();
+ final String portComponentName = portStatus.getName();
+ final String parentId = portStatus.getGroupId();
+ final String portComponentType = "InputPort";
+ AMOUNT_FLOWFILES_SENT.labels(instanceId, portComponentType,
portComponentName, portComponentId,
parentId).set(portStatus.getFlowFilesSent());
+ AMOUNT_FLOWFILES_RECEIVED.labels(instanceId,
portComponentType, portComponentName, portComponentId,
parentId).set(portStatus.getFlowFilesReceived());
+
+ AMOUNT_BYTES_SENT.labels(instanceId, portComponentType,
portComponentName, portComponentId, parentId).set(portStatus.getBytesSent());
+ AMOUNT_BYTES_READ.labels(instanceId, portComponentType,
portComponentName, portComponentId, parentId).set(portStatus.getInputBytes());
+ AMOUNT_BYTES_WRITTEN.labels(instanceId, portComponentType,
portComponentName, portComponentId, parentId).set(portStatus.getOutputBytes());
+ AMOUNT_BYTES_RECEIVED.labels(instanceId, portComponentType,
portComponentName, portComponentId,
parentId).set(portStatus.getBytesReceived());
+
+ AMOUNT_ITEMS_OUTPUT.labels(instanceId, portComponentType,
portComponentName, portComponentId, parentId, "", "", "", "")
+ .set(portStatus.getOutputCount());
+ AMOUNT_ITEMS_INPUT.labels(instanceId, portComponentType,
portComponentName, portComponentId, parentId, "", "", "", "")
+ .set(portStatus.getInputCount());
+
+ final Boolean isTransmitting = portStatus.isTransmitting();
+ IS_TRANSMITTING.labels(instanceId, portComponentType,
portComponentName, portComponentId, parentId, portStatus.getRunStatus().name())
+ .set(isTransmitting == null ? 0 : (isTransmitting ? 1
: 0));
+
+ AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId,
portComponentType, portComponentName, portComponentId,
parentId).set(portStatus.getActiveThreadCount());
+ }
+ for(PortStatus portStatus : status.getOutputPortStatus()) {
+ final String portComponentId = portStatus.getId();
+ final String portComponentName = portStatus.getName();
+ final String parentId = portStatus.getGroupId();
+ final String portComponentType = "OutputPort";
+ AMOUNT_FLOWFILES_SENT.labels(instanceId, portComponentType,
portComponentName, portComponentId,
parentId).set(portStatus.getFlowFilesSent());
+ AMOUNT_FLOWFILES_RECEIVED.labels(instanceId,
portComponentType, portComponentName,
portComponentId).set(portStatus.getFlowFilesReceived());
+
+ AMOUNT_BYTES_SENT.labels(instanceId, portComponentType,
portComponentName, portComponentId, parentId).set(portStatus.getBytesSent());
+ AMOUNT_BYTES_READ.labels(instanceId, portComponentType,
portComponentName, portComponentId, parentId).set(portStatus.getInputBytes());
+ AMOUNT_BYTES_WRITTEN.labels(instanceId, portComponentType,
portComponentName, portComponentId, parentId).set(portStatus.getOutputBytes());
+ AMOUNT_BYTES_RECEIVED.labels(instanceId, portComponentType,
portComponentName, portComponentId,
parentId).set(portStatus.getBytesReceived());
+
+ AMOUNT_ITEMS_OUTPUT.labels(instanceId, portComponentType,
portComponentName, portComponentId, parentId, "", "", "", "")
+ .set(portStatus.getOutputCount());
+ AMOUNT_ITEMS_INPUT.labels(instanceId, portComponentType,
portComponentName, portComponentId, parentId, "", "", "", "")
+ .set(portStatus.getInputCount());
+
+ final Boolean isTransmitting = portStatus.isTransmitting();
+ IS_TRANSMITTING.labels(instanceId, portComponentType,
portComponentName, portComponentId, parentId, portStatus.getRunStatus().name())
+ .set(isTransmitting == null ? 0 : (isTransmitting ? 1
: 0));
+
+ AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId,
portComponentType, portComponentName, portComponentId,
parentId).set(portStatus.getActiveThreadCount());
+ }
+ for(RemoteProcessGroupStatus remoteProcessGroupStatus :
status.getRemoteProcessGroupStatus()) {
+ final String rpgComponentId = remoteProcessGroupStatus.getId();
+ final String rpgComponentName =
remoteProcessGroupStatus.getName();
+ final String parentId = remoteProcessGroupStatus.getGroupId();
+ final String rpgComponentType = "RemoteProcessGroup";
+
+ AMOUNT_BYTES_WRITTEN.labels(instanceId, rpgComponentType,
rpgComponentName, rpgComponentId,
parentId).set(remoteProcessGroupStatus.getSentContentSize());
+ AMOUNT_BYTES_RECEIVED.labels(instanceId, rpgComponentType,
rpgComponentName, rpgComponentId,
parentId).set(remoteProcessGroupStatus.getReceivedContentSize());
+ AMOUNT_ITEMS_OUTPUT.labels(instanceId, rpgComponentType,
rpgComponentName, rpgComponentId, parentId, "", "", "", "")
+ .set(remoteProcessGroupStatus.getSentCount());
+ AMOUNT_ITEMS_INPUT.labels(instanceId, rpgComponentType,
rpgComponentName, rpgComponentId, parentId, "", "", "", "")
+ .set(remoteProcessGroupStatus.getReceivedCount());
+ ACTIVE_REMOTE_PORT_COUNT.labels(instanceId, rpgComponentType,
rpgComponentName, rpgComponentId,
parentId).set(remoteProcessGroupStatus.getActiveRemotePortCount());
+ INACTIVE_REMOTE_PORT_COUNT.labels(instanceId,
rpgComponentType, rpgComponentName, rpgComponentId,
parentId).set(remoteProcessGroupStatus.getInactiveRemotePortCount());
- AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId, processGroupName,
processGroupId).set(status.getActiveThreadCount());
+ AVERAGE_LINEAGE_DURATION.labels(instanceId, rpgComponentType,
rpgComponentName, rpgComponentId,
parentId).set(remoteProcessGroupStatus.getAverageLineageDuration());
- for (ProcessorStatus pstatus : processorStatus) {
- Map<String, Long> counters = pstatus.getCounters();
+ IS_TRANSMITTING.labels(instanceId, rpgComponentType,
rpgComponentName, rpgComponentId, parentId,
remoteProcessGroupStatus.getTransmissionStatus().name())
+
.set(TransmissionStatus.Transmitting.equals(remoteProcessGroupStatus.getTransmissionStatus())
? 1 : 0);
- if(counters != null) {
- counters.entrySet().stream().forEach(entry ->
PROCESSOR_COUNTERS
- .labels(pstatus.getName(), entry.getKey(),
pstatus.getId(), instanceId).set(entry.getValue()));
+ AMOUNT_THREADS_TOTAL_ACTIVE.labels(instanceId,
rpgComponentType, rpgComponentName, rpgComponentId,
parentId).set(remoteProcessGroupStatus.getActiveThreadCount());
}
}
return NIFI_REGISTRY;
-
}
public static CollectorRegistry createJvmMetrics(VirtualMachineMetrics
jvmMetrics, String instanceId) {
diff --git
a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java
b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java
index bba7a8b..8cb18b2 100644
---
a/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java
+++
b/nifi-nar-bundles/nifi-prometheus-bundle/nifi-prometheus-reporting-task/src/test/java/org/apache/nifi/reporting/prometheus/TestPrometheusReportingTask.java
@@ -103,9 +103,9 @@ public class TestPrometheusReportingTask {
HttpEntity entity = response.getEntity();
String content = EntityUtils.toString(entity);
Assert.assertEquals(true, content.contains(
-
"nifi_process_group_amount_flowfiles_received{instance=\"localhost\",process_group_name=\"root\",process_group_id=\"1234\",}
5.0"));
+
"nifi_amount_flowfiles_received{instance=\"localhost\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",}
5.0"));
Assert.assertEquals(true, content.contains(
-
"nifi_process_group_amount_threads_active{instance=\"localhost\",process_group_name=\"root\",process_group_id=\"1234\",}
5.0"));
+
"nifi_amount_threads_active{instance=\"localhost\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",}
5.0"));
}
}