This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new f410c8d NIFI-8272 Delete stale metrics from REST API Prometheus
endpoint.
f410c8d is described below
commit f410c8df0a2b05cdf078e3d3f6b086ac7ebed5f9
Author: noblenumbat360 <[email protected]>
AuthorDate: Fri Oct 8 01:37:42 2021 +1100
NIFI-8272 Delete stale metrics from REST API Prometheus endpoint.
Added <scope>test</scope> tag to the nifi-web-api pom.xml and corrected
imports.
Signed-off-by: Matthew Burgess <[email protected]>
This closes #5447
---
.../apache/nifi/util/MockBulletinRepository.java | 3 +-
.../prometheus/util/AbstractMetricsRegistry.java | 9 ++
.../prometheus/util/PrometheusMetricsUtil.java | 17 ---
.../nifi-framework/nifi-web/nifi-web-api/pom.xml | 5 +
.../apache/nifi/web/StandardNiFiServiceFacade.java | 3 +-
.../nifi/web/StandardNiFiServiceFacadeSpec.groovy | 157 +++++++++++++++++++++
6 files changed, 175 insertions(+), 19 deletions(-)
diff --git
a/nifi-mock/src/main/java/org/apache/nifi/util/MockBulletinRepository.java
b/nifi-mock/src/main/java/org/apache/nifi/util/MockBulletinRepository.java
index bafdfdb..c273ae8 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockBulletinRepository.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockBulletinRepository.java
@@ -20,6 +20,7 @@ import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinQuery;
import org.apache.nifi.reporting.BulletinRepository;
+import java.util.ArrayList;
import java.util.List;
public class MockBulletinRepository implements BulletinRepository {
@@ -45,7 +46,7 @@ public class MockBulletinRepository implements
BulletinRepository {
@Override
public List<Bulletin> findBulletins(BulletinQuery bulletinQuery) {
// TODO: Implement
- return null;
+ return new ArrayList<Bulletin>();
}
@Override
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/AbstractMetricsRegistry.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/AbstractMetricsRegistry.java
index 12c65e7..9163ed9 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/AbstractMetricsRegistry.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/AbstractMetricsRegistry.java
@@ -50,4 +50,13 @@ public class AbstractMetricsRegistry {
counter.labels(labels).inc(val);
}
+
+ public void clear() {
+ for (Gauge gauge : nameToGaugeMap.values()) {
+ gauge.clear();
+ }
+ for (Counter counter : nameToCounterMap.values()) {
+ counter.clear();
+ }
+ }
}
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java
index b0d6fef..f52a5cb 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java
@@ -18,7 +18,6 @@
package org.apache.nifi.prometheus.util;
import io.prometheus.client.CollectorRegistry;
-import io.prometheus.client.SimpleCollector;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.status.ConnectionStatus;
@@ -34,8 +33,6 @@ import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StringUtils;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -101,20 +98,6 @@ public class PrometheusMetricsUtil {
final String componentId = StringUtils.isEmpty(status.getId()) ?
DEFAULT_LABEL_STRING : status.getId();
final String componentName = StringUtils.isEmpty(status.getName()) ?
DEFAULT_LABEL_STRING : status.getName();
- // Clear all collectors to deal with removed/renamed components -- for
root PG only
- if("RootProcessGroup".equals(componentType)) {
- try {
- for (final Field field :
PrometheusMetricsUtil.class.getDeclaredFields()) {
- if (Modifier.isStatic(field.getModifiers()) &&
(field.get(null) instanceof SimpleCollector)) {
- SimpleCollector<?> sc = (SimpleCollector<?>)
(field.get(null));
- sc.clear();
- }
- }
- } catch (IllegalAccessException e) {
- // ignore
- }
- }
-
nifiMetricsRegistry.setDataPoint(status.getFlowFilesSent(),
"AMOUNT_FLOWFILES_SENT", instanceId, componentType, componentName, componentId,
parentPGId);
nifiMetricsRegistry.setDataPoint(status.getFlowFilesTransferred(),
"AMOUNT_FLOWFILES_TRANSFERRED", instanceId, componentType, componentName,
componentId, parentPGId);
nifiMetricsRegistry.setDataPoint(status.getFlowFilesReceived(),
"AMOUNT_FLOWFILES_RECEIVED", instanceId, componentType, componentName,
componentId, parentPGId);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml
index 7a13dd2..4c855b2 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml
@@ -432,5 +432,10 @@
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index b22a417..ee986dd 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -5587,9 +5587,10 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
@Override
public Collection<CollectorRegistry> generateFlowMetrics() {
-
final String instanceId =
StringUtils.isEmpty(controllerFacade.getInstanceId()) ? "" :
controllerFacade.getInstanceId();
ProcessGroupStatus rootPGStatus =
controllerFacade.getProcessGroupStatus("root");
+
+ nifiMetricsRegistry.clear();
PrometheusMetricsUtil.createNifiMetrics(nifiMetricsRegistry,
rootPGStatus, instanceId, "", "RootProcessGroup",
PrometheusMetricsUtil.METRICS_STRATEGY_COMPONENTS.getValue());
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/groovy/org/apache/nifi/web/StandardNiFiServiceFacadeSpec.groovy
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/groovy/org/apache/nifi/web/StandardNiFiServiceFacadeSpec.groovy
index a830a87..3018830 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/groovy/org/apache/nifi/web/StandardNiFiServiceFacadeSpec.groovy
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/groovy/org/apache/nifi/web/StandardNiFiServiceFacadeSpec.groovy
@@ -16,6 +16,8 @@
*/
package org.apache.nifi.web
+import io.prometheus.client.CollectorRegistry
+import io.prometheus.client.exporter.common.TextFormat
import org.apache.nifi.authorization.AccessDeniedException
import org.apache.nifi.authorization.AccessPolicy
import org.apache.nifi.authorization.AuthorizableLookup
@@ -30,9 +32,19 @@ import org.apache.nifi.authorization.resource.ResourceFactory
import org.apache.nifi.authorization.user.NiFiUser
import org.apache.nifi.authorization.user.NiFiUserDetails
import org.apache.nifi.authorization.user.StandardNiFiUser
+import org.apache.nifi.connectable.Connection
+import org.apache.nifi.controller.flow.StandardFlowManager
+import org.apache.nifi.controller.repository.FlowFileEvent
+import org.apache.nifi.controller.repository.FlowFileEventRepository
import org.apache.nifi.controller.service.ControllerServiceProvider
+import org.apache.nifi.controller.status.PortStatus
+import org.apache.nifi.controller.status.ProcessGroupStatus
+import org.apache.nifi.controller.status.RunStatus
+import org.apache.nifi.diagnostics.StorageUsage
+import org.apache.nifi.diagnostics.SystemDiagnostics
import org.apache.nifi.reporting.Bulletin
import org.apache.nifi.reporting.BulletinRepository
+import org.apache.nifi.util.MockBulletinRepository
import org.apache.nifi.web.api.dto.AccessPolicyDTO
import org.apache.nifi.web.api.dto.BulletinDTO
import org.apache.nifi.web.api.dto.DtoFactory
@@ -898,6 +910,151 @@ class StandardNiFiServiceFacadeSpec extends Specification
{
}
+ def "Test REST API Prometheus Metrics Endpoint"() {
+ given:
+ def serviceFacade = new StandardNiFiServiceFacade()
+ BulletinRepository bulletinRepository = new MockBulletinRepository()
+ serviceFacade.setBulletinRepository(bulletinRepository)
+
+ ControllerFacade controllerFacade = Mock()
+ serviceFacade.setControllerFacade(controllerFacade)
+ controllerFacade.getInstanceId() >> "ABC"
+ controllerFacade.getMaxEventDrivenThreadCount() >> 1
+ controllerFacade.getMaxTimerDrivenThreadCount() >> 10
+
+ // Setting up storage repositories
+ StorageUsage flowFileStorage = new StorageUsage()
+ flowFileStorage.setIdentifier("flowFile")
+ flowFileStorage.setTotalSpace(222)
+ flowFileStorage.setFreeSpace(111)
+
+ StorageUsage contentStorage = new StorageUsage()
+ contentStorage.setIdentifier("default")
+ contentStorage.setTotalSpace(444)
+ contentStorage.setFreeSpace(111)
+ Map<String, StorageUsage> contentStorageMap = new HashMap<>()
+ contentStorageMap.put("default", contentStorage)
+
+ StorageUsage provenanceStorage = new StorageUsage()
+ provenanceStorage.setIdentifier("default")
+ provenanceStorage.setTotalSpace(666)
+ provenanceStorage.setFreeSpace(111)
+ Map<String, StorageUsage> provenanceStorageMap = new HashMap<>()
+ provenanceStorageMap.put("default", provenanceStorage)
+
+ // Setting up SystemDiagnostics
+ SystemDiagnostics systemDiagnostics = new SystemDiagnostics()
+ systemDiagnostics.setFlowFileRepositoryStorageUsage(flowFileStorage)
+ systemDiagnostics.setContentRepositoryStorageUsage(contentStorageMap)
+
systemDiagnostics.setProvenanceRepositoryStorageUsage(provenanceStorageMap)
+
+ controllerFacade.getSystemDiagnostics() >> systemDiagnostics
+
+ // Setting up flow
+ ProcessGroupStatus rootGroupStatus = new ProcessGroupStatus()
+ rootGroupStatus.setId("1234");
+ rootGroupStatus.setFlowFilesReceived(5);
+ rootGroupStatus.setBytesReceived(10000);
+ rootGroupStatus.setFlowFilesSent(10);
+ rootGroupStatus.setBytesSent(20000);
+ rootGroupStatus.setQueuedCount(100);
+ rootGroupStatus.setQueuedContentSize(1024L);
+ rootGroupStatus.setBytesRead(60000L);
+ rootGroupStatus.setBytesWritten(80000L);
+ rootGroupStatus.setActiveThreadCount(5);
+ rootGroupStatus.setName("root");
+ rootGroupStatus.setFlowFilesTransferred(5);
+ rootGroupStatus.setBytesTransferred(10000);
+ rootGroupStatus.setOutputContentSize(1000L);
+ rootGroupStatus.setInputContentSize(1000L);
+ rootGroupStatus.setOutputCount(100);
+ rootGroupStatus.setInputCount(1000);
+
+ PortStatus outputPortStatus = new PortStatus();
+ outputPortStatus.setId("9876");
+ outputPortStatus.setName("out");
+ outputPortStatus.setGroupId("1234");
+ outputPortStatus.setRunStatus(RunStatus.Stopped);
+ outputPortStatus.setActiveThreadCount(1);
+
+
rootGroupStatus.setOutputPortStatus(Collections.singletonList(outputPortStatus));
+ // Create a nested group status
+ ProcessGroupStatus groupStatus2 = new ProcessGroupStatus();
+ groupStatus2.setFlowFilesReceived(5);
+ groupStatus2.setBytesReceived(10000);
+ groupStatus2.setFlowFilesSent(10);
+ groupStatus2.setBytesSent(20000);
+ groupStatus2.setQueuedCount(100);
+ groupStatus2.setQueuedContentSize(1024L);
+ groupStatus2.setActiveThreadCount(2);
+ groupStatus2.setBytesRead(12345L);
+ groupStatus2.setBytesWritten(11111L);
+ groupStatus2.setFlowFilesTransferred(5);
+ groupStatus2.setBytesTransferred(10000);
+ groupStatus2.setOutputContentSize(1000L);
+ groupStatus2.setInputContentSize(1000L);
+ groupStatus2.setOutputCount(100);
+ groupStatus2.setInputCount(1000);
+ groupStatus2.setId("3378");
+ groupStatus2.setName("nestedPG");
+ Collection<ProcessGroupStatus> nestedGroupStatuses = new ArrayList<>();
+ nestedGroupStatuses.add(groupStatus2);
+ rootGroupStatus.setProcessGroupStatus(nestedGroupStatuses);
+
+ // setting up flowFile events
+ controllerFacade.getProcessGroupStatus("root") >> rootGroupStatus
+ FlowFileEventRepository flowFileEventRepository = Mock()
+ controllerFacade.getFlowFileEventRepository() >>
flowFileEventRepository
+ FlowFileEvent aggregateEvent = Mock()
+ flowFileEventRepository.reportAggregateEvent() >> aggregateEvent
+
+ // setting up connections (empty list for testing)
+ Set<Connection> connections = new HashSet()
+ StandardFlowManager flowManager = Mock()
+ controllerFacade.getFlowManager() >> flowManager
+ flowManager.findAllConnections() >> connections
+
+ when:
+ Collection<CollectorRegistry> allRegistries =
serviceFacade.generateFlowMetrics()
+
+ // Converts metrics into a String for testing
+ Writer writer = new StringWriter();
+ for (CollectorRegistry collectorRegistry : allRegistries) {
+ TextFormat.write004(writer,
collectorRegistry.metricFamilySamples());
+ }
+ String output = writer.toString();
+ writer.close()
+
+ // rename root group and generate metrics again to a different string
+ rootGroupStatus.setName("rootroot")
+ allRegistries = serviceFacade.generateFlowMetrics()
+ writer = new StringWriter()
+ for (CollectorRegistry collectorRegistry : allRegistries) {
+ TextFormat.write004(writer,
collectorRegistry.metricFamilySamples())
+ }
+ String output2 = writer.toString()
+ writer.close()
+
+ then:
+ // flow metrics
+
output.contains("nifi_amount_flowfiles_received{instance=\"ABC\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",}
5.0");
+
output.contains("nifi_amount_threads_active{instance=\"ABC\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",}
5.0");
+
output.contains("nifi_amount_threads_active{instance=\"ABC\",component_type=\"ProcessGroup\",component_name=\"nestedPG\",component_id=\"3378\",parent_id=\"1234\",}
2.0");
+
+ // jvm
+ output.contains("nifi_jvm_heap_used{instance=\"ABC\",}")
+ output.contains("# HELP nifi_jvm_heap_used NiFi JVM heap used")
+ output.contains("# TYPE nifi_jvm_heap_used gauge")
+ output.contains("nifi_jvm_thread_count{instance=\"ABC\",}")
+
+ // test that renamed items are in the metrics output and that the
previously named versions have been removed from the metrics output.
+
output2.contains("nifi_amount_flowfiles_received{instance=\"ABC\",component_type=\"RootProcessGroup\",component_name=\"rootroot\",component_id=\"1234\",parent_id=\"\",}
5.0");
+
output2.contains("nifi_amount_threads_active{instance=\"ABC\",component_type=\"RootProcessGroup\",component_name=\"rootroot\",component_id=\"1234\",parent_id=\"\",}
5.0");
+
!output2.contains("nifi_amount_flowfiles_received{instance=\"ABC\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",}
5.0");
+
!output2.contains("nifi_amount_threads_active{instance=\"ABC\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",}
5.0");
+
+ }
+
private UserGroupDTO createUserGroupDTO() {
new UserGroupDTO(id: 'group-1', name: 'test group', users:
[createUserEntity()] as Set)
}