This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new f410c8d  NIFI-8272 Delete stale metrics from REST API Prometheus 
endpoint.
f410c8d is described below

commit f410c8df0a2b05cdf078e3d3f6b086ac7ebed5f9
Author: noblenumbat360 <[email protected]>
AuthorDate: Fri Oct 8 01:37:42 2021 +1100

    NIFI-8272 Delete stale metrics from REST API Prometheus endpoint.
    
    Added <scope>test</scope> tag to the nifi-web-api pom.xml and corrected 
imports.
    
    Signed-off-by: Matthew Burgess <[email protected]>
    
    This closes #5447
---
 .../apache/nifi/util/MockBulletinRepository.java   |   3 +-
 .../prometheus/util/AbstractMetricsRegistry.java   |   9 ++
 .../prometheus/util/PrometheusMetricsUtil.java     |  17 ---
 .../nifi-framework/nifi-web/nifi-web-api/pom.xml   |   5 +
 .../apache/nifi/web/StandardNiFiServiceFacade.java |   3 +-
 .../nifi/web/StandardNiFiServiceFacadeSpec.groovy  | 157 +++++++++++++++++++++
 6 files changed, 175 insertions(+), 19 deletions(-)

diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockBulletinRepository.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockBulletinRepository.java
index bafdfdb..c273ae8 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockBulletinRepository.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockBulletinRepository.java
@@ -20,6 +20,7 @@ import org.apache.nifi.reporting.Bulletin;
 import org.apache.nifi.reporting.BulletinQuery;
 import org.apache.nifi.reporting.BulletinRepository;
 
+import java.util.ArrayList;
 import java.util.List;
 
 public class MockBulletinRepository implements BulletinRepository {
@@ -45,7 +46,7 @@ public class MockBulletinRepository implements 
BulletinRepository {
     @Override
     public List<Bulletin> findBulletins(BulletinQuery bulletinQuery) {
         // TODO: Implement
-        return null;
+        return new ArrayList<Bulletin>();
     }
 
     @Override
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/AbstractMetricsRegistry.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/AbstractMetricsRegistry.java
index 12c65e7..9163ed9 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/AbstractMetricsRegistry.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/AbstractMetricsRegistry.java
@@ -50,4 +50,13 @@ public class AbstractMetricsRegistry {
 
         counter.labels(labels).inc(val);
     }
+
+    public void clear() {
+        for (Gauge gauge : nameToGaugeMap.values()) {
+            gauge.clear();
+        }
+        for (Counter counter : nameToCounterMap.values()) {
+            counter.clear();
+        }
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java
index b0d6fef..f52a5cb 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/PrometheusMetricsUtil.java
@@ -18,7 +18,6 @@
 package org.apache.nifi.prometheus.util;
 
 import io.prometheus.client.CollectorRegistry;
-import io.prometheus.client.SimpleCollector;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.status.ConnectionStatus;
@@ -34,8 +33,6 @@ import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.StringUtils;
 
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -101,20 +98,6 @@ public class PrometheusMetricsUtil {
         final String componentId = StringUtils.isEmpty(status.getId()) ? 
DEFAULT_LABEL_STRING : status.getId();
         final String componentName = StringUtils.isEmpty(status.getName()) ? 
DEFAULT_LABEL_STRING : status.getName();
 
-        // Clear all collectors to deal with removed/renamed components -- for 
root PG only
-        if("RootProcessGroup".equals(componentType)) {
-            try {
-                for (final Field field : 
PrometheusMetricsUtil.class.getDeclaredFields()) {
-                    if (Modifier.isStatic(field.getModifiers()) && 
(field.get(null) instanceof SimpleCollector)) {
-                        SimpleCollector<?> sc = (SimpleCollector<?>) 
(field.get(null));
-                        sc.clear();
-                    }
-                }
-            } catch (IllegalAccessException e) {
-                // ignore
-            }
-        }
-
         nifiMetricsRegistry.setDataPoint(status.getFlowFilesSent(), 
"AMOUNT_FLOWFILES_SENT", instanceId, componentType, componentName, componentId, 
parentPGId);
         nifiMetricsRegistry.setDataPoint(status.getFlowFilesTransferred(), 
"AMOUNT_FLOWFILES_TRANSFERRED", instanceId, componentType, componentName, 
componentId, parentPGId);
         nifiMetricsRegistry.setDataPoint(status.getFlowFilesReceived(), 
"AMOUNT_FLOWFILES_RECEIVED", instanceId, componentType, componentName, 
componentId, parentPGId);
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml
index 7a13dd2..4c855b2 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml
@@ -432,5 +432,10 @@
             <groupId>org.slf4j</groupId>
             <artifactId>jcl-over-slf4j</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index b22a417..ee986dd 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -5587,9 +5587,10 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
 
     @Override
     public Collection<CollectorRegistry> generateFlowMetrics() {
-
         final String instanceId = 
StringUtils.isEmpty(controllerFacade.getInstanceId()) ? "" : 
controllerFacade.getInstanceId();
         ProcessGroupStatus rootPGStatus = 
controllerFacade.getProcessGroupStatus("root");
+
+        nifiMetricsRegistry.clear();
         PrometheusMetricsUtil.createNifiMetrics(nifiMetricsRegistry, 
rootPGStatus, instanceId, "", "RootProcessGroup",
                 PrometheusMetricsUtil.METRICS_STRATEGY_COMPONENTS.getValue());
 
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/groovy/org/apache/nifi/web/StandardNiFiServiceFacadeSpec.groovy
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/groovy/org/apache/nifi/web/StandardNiFiServiceFacadeSpec.groovy
index a830a87..3018830 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/groovy/org/apache/nifi/web/StandardNiFiServiceFacadeSpec.groovy
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/groovy/org/apache/nifi/web/StandardNiFiServiceFacadeSpec.groovy
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.web
 
+import io.prometheus.client.CollectorRegistry
+import io.prometheus.client.exporter.common.TextFormat
 import org.apache.nifi.authorization.AccessDeniedException
 import org.apache.nifi.authorization.AccessPolicy
 import org.apache.nifi.authorization.AuthorizableLookup
@@ -30,9 +32,19 @@ import org.apache.nifi.authorization.resource.ResourceFactory
 import org.apache.nifi.authorization.user.NiFiUser
 import org.apache.nifi.authorization.user.NiFiUserDetails
 import org.apache.nifi.authorization.user.StandardNiFiUser
+import org.apache.nifi.connectable.Connection
+import org.apache.nifi.controller.flow.StandardFlowManager
+import org.apache.nifi.controller.repository.FlowFileEvent
+import org.apache.nifi.controller.repository.FlowFileEventRepository
 import org.apache.nifi.controller.service.ControllerServiceProvider
+import org.apache.nifi.controller.status.PortStatus
+import org.apache.nifi.controller.status.ProcessGroupStatus
+import org.apache.nifi.controller.status.RunStatus
+import org.apache.nifi.diagnostics.StorageUsage
+import org.apache.nifi.diagnostics.SystemDiagnostics
 import org.apache.nifi.reporting.Bulletin
 import org.apache.nifi.reporting.BulletinRepository
+import org.apache.nifi.util.MockBulletinRepository
 import org.apache.nifi.web.api.dto.AccessPolicyDTO
 import org.apache.nifi.web.api.dto.BulletinDTO
 import org.apache.nifi.web.api.dto.DtoFactory
@@ -898,6 +910,151 @@ class StandardNiFiServiceFacadeSpec extends Specification 
{
 
     }
 
+    def "Test REST API Prometheus Metrics Endpoint"() {
+        given:
+        def serviceFacade = new StandardNiFiServiceFacade()
+        BulletinRepository bulletinRepository = new MockBulletinRepository()
+        serviceFacade.setBulletinRepository(bulletinRepository)
+
+        ControllerFacade controllerFacade = Mock()
+        serviceFacade.setControllerFacade(controllerFacade)
+        controllerFacade.getInstanceId() >> "ABC"
+        controllerFacade.getMaxEventDrivenThreadCount() >> 1
+        controllerFacade.getMaxTimerDrivenThreadCount() >> 10
+
+        // Setting up storage repositories
+        StorageUsage flowFileStorage = new StorageUsage()
+        flowFileStorage.setIdentifier("flowFile")
+        flowFileStorage.setTotalSpace(222)
+        flowFileStorage.setFreeSpace(111)
+
+        StorageUsage contentStorage = new StorageUsage()
+        contentStorage.setIdentifier("default")
+        contentStorage.setTotalSpace(444)
+        contentStorage.setFreeSpace(111)
+        Map<String, StorageUsage> contentStorageMap = new HashMap<>()
+        contentStorageMap.put("default", contentStorage)
+
+        StorageUsage provenanceStorage = new StorageUsage()
+        provenanceStorage.setIdentifier("default")
+        provenanceStorage.setTotalSpace(666)
+        provenanceStorage.setFreeSpace(111)
+        Map<String, StorageUsage> provenanceStorageMap = new HashMap<>()
+        provenanceStorageMap.put("default", provenanceStorage)
+
+        // Setting up SystemDiagnostics
+        SystemDiagnostics systemDiagnostics = new SystemDiagnostics()
+        systemDiagnostics.setFlowFileRepositoryStorageUsage(flowFileStorage)
+        systemDiagnostics.setContentRepositoryStorageUsage(contentStorageMap)
+        
systemDiagnostics.setProvenanceRepositoryStorageUsage(provenanceStorageMap)
+
+        controllerFacade.getSystemDiagnostics() >> systemDiagnostics
+
+        // Setting up flow
+        ProcessGroupStatus rootGroupStatus = new ProcessGroupStatus()
+        rootGroupStatus.setId("1234");
+        rootGroupStatus.setFlowFilesReceived(5);
+        rootGroupStatus.setBytesReceived(10000);
+        rootGroupStatus.setFlowFilesSent(10);
+        rootGroupStatus.setBytesSent(20000);
+        rootGroupStatus.setQueuedCount(100);
+        rootGroupStatus.setQueuedContentSize(1024L);
+        rootGroupStatus.setBytesRead(60000L);
+        rootGroupStatus.setBytesWritten(80000L);
+        rootGroupStatus.setActiveThreadCount(5);
+        rootGroupStatus.setName("root");
+        rootGroupStatus.setFlowFilesTransferred(5);
+        rootGroupStatus.setBytesTransferred(10000);
+        rootGroupStatus.setOutputContentSize(1000L);
+        rootGroupStatus.setInputContentSize(1000L);
+        rootGroupStatus.setOutputCount(100);
+        rootGroupStatus.setInputCount(1000);
+
+        PortStatus outputPortStatus = new PortStatus();
+        outputPortStatus.setId("9876");
+        outputPortStatus.setName("out");
+        outputPortStatus.setGroupId("1234");
+        outputPortStatus.setRunStatus(RunStatus.Stopped);
+        outputPortStatus.setActiveThreadCount(1);
+
+        
rootGroupStatus.setOutputPortStatus(Collections.singletonList(outputPortStatus));
+        // Create a nested group status
+        ProcessGroupStatus groupStatus2 = new ProcessGroupStatus();
+        groupStatus2.setFlowFilesReceived(5);
+        groupStatus2.setBytesReceived(10000);
+        groupStatus2.setFlowFilesSent(10);
+        groupStatus2.setBytesSent(20000);
+        groupStatus2.setQueuedCount(100);
+        groupStatus2.setQueuedContentSize(1024L);
+        groupStatus2.setActiveThreadCount(2);
+        groupStatus2.setBytesRead(12345L);
+        groupStatus2.setBytesWritten(11111L);
+        groupStatus2.setFlowFilesTransferred(5);
+        groupStatus2.setBytesTransferred(10000);
+        groupStatus2.setOutputContentSize(1000L);
+        groupStatus2.setInputContentSize(1000L);
+        groupStatus2.setOutputCount(100);
+        groupStatus2.setInputCount(1000);
+        groupStatus2.setId("3378");
+        groupStatus2.setName("nestedPG");
+        Collection<ProcessGroupStatus> nestedGroupStatuses = new ArrayList<>();
+        nestedGroupStatuses.add(groupStatus2);
+        rootGroupStatus.setProcessGroupStatus(nestedGroupStatuses);
+
+        // setting up flowFile events
+        controllerFacade.getProcessGroupStatus("root") >> rootGroupStatus
+        FlowFileEventRepository flowFileEventRepository = Mock()
+        controllerFacade.getFlowFileEventRepository() >> 
flowFileEventRepository
+        FlowFileEvent aggregateEvent = Mock()
+        flowFileEventRepository.reportAggregateEvent() >> aggregateEvent
+
+        // setting up connections (empty list for testing)
+        Set<Connection> connections = new HashSet()
+        StandardFlowManager flowManager = Mock()
+        controllerFacade.getFlowManager() >> flowManager
+        flowManager.findAllConnections() >> connections
+
+        when:
+        Collection<CollectorRegistry> allRegistries = 
serviceFacade.generateFlowMetrics()
+
+        // Converts metrics into a String for testing
+        Writer writer = new StringWriter();
+        for (CollectorRegistry collectorRegistry : allRegistries) {
+            TextFormat.write004(writer, 
collectorRegistry.metricFamilySamples());
+        }
+        String output = writer.toString();
+        writer.close()
+
+        // rename root group and generate metrics again to a different string
+        rootGroupStatus.setName("rootroot")
+        allRegistries = serviceFacade.generateFlowMetrics()
+        writer = new StringWriter()
+        for (CollectorRegistry collectorRegistry : allRegistries) {
+            TextFormat.write004(writer, 
collectorRegistry.metricFamilySamples())
+        }
+        String output2 = writer.toString()
+        writer.close()
+
+        then:
+        // flow metrics
+        
output.contains("nifi_amount_flowfiles_received{instance=\"ABC\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",}
 5.0");
+        
output.contains("nifi_amount_threads_active{instance=\"ABC\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",}
 5.0");
+        
output.contains("nifi_amount_threads_active{instance=\"ABC\",component_type=\"ProcessGroup\",component_name=\"nestedPG\",component_id=\"3378\",parent_id=\"1234\",}
 2.0");
+
+        // jvm
+        output.contains("nifi_jvm_heap_used{instance=\"ABC\",}")
+        output.contains("# HELP nifi_jvm_heap_used NiFi JVM heap used")
+        output.contains("# TYPE nifi_jvm_heap_used gauge")
+        output.contains("nifi_jvm_thread_count{instance=\"ABC\",}")
+
+        // test that renamed items are in the metrics output and that the 
previously named versions have been removed from the metrics output.
+        
output2.contains("nifi_amount_flowfiles_received{instance=\"ABC\",component_type=\"RootProcessGroup\",component_name=\"rootroot\",component_id=\"1234\",parent_id=\"\",}
 5.0");
+        
output2.contains("nifi_amount_threads_active{instance=\"ABC\",component_type=\"RootProcessGroup\",component_name=\"rootroot\",component_id=\"1234\",parent_id=\"\",}
 5.0");
+        
!output2.contains("nifi_amount_flowfiles_received{instance=\"ABC\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",}
 5.0");
+        
!output2.contains("nifi_amount_threads_active{instance=\"ABC\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",}
 5.0");
+
+    }
+
     private UserGroupDTO createUserGroupDTO() {
         new UserGroupDTO(id: 'group-1', name: 'test group', users: 
[createUserEntity()] as Set)
     }

Reply via email to