This is an automated email from the ASF dual-hosted git repository.

jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new dd7131b  NIFI-9435 Added registries and names include parameters to 
Flow Metrics
dd7131b is described below

commit dd7131b257113581a8485b2c59184c7eecc96ce6
Author: exceptionfactory <[email protected]>
AuthorDate: Fri Dec 3 15:30:11 2021 -0600

    NIFI-9435 Added registries and names include parameters to Flow Metrics
    
    - Added optional includedRegistries query parameter to Flow Metrics 
Resource method supporting one or more registries
    - Added optional includedNames query parameter to Flow Metrics Resource 
method supporting one or more metric family names
    - Added sampleName and sampleLabelValue optional pattern parameters
    - Added FilteringMetricFamilySamplesEnumeration to support streamed 
filtering
    - Added PrometheusMetricsWriter and TextFormat implementation
    
    Signed-off-by: Joe Gresock <[email protected]>
    
    This closes #5571.
---
 .../org/apache/nifi/web/NiFiServiceFacade.java     |   9 +
 .../apache/nifi/web/StandardNiFiServiceFacade.java |  36 +++-
 .../java/org/apache/nifi/web/api/FlowResource.java | 224 ++++++++-------------
 .../FilteringMetricFamilySamplesEnumeration.java   | 130 ++++++++++++
 .../web/api/metrics/PrometheusMetricsWriter.java   |  37 ++++
 .../metrics/TextFormatPrometheusMetricsWriter.java |  71 +++++++
 .../nifi/web/api/request/FlowMetricsProducer.java  |  34 ++++
 .../nifi/web/api/request/FlowMetricsRegistry.java  |  53 +++++
 .../org/apache/nifi/web/api/TestFlowResource.java  | 177 ++++++++++++++++
 9 files changed, 623 insertions(+), 148 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index 10f13cd..c53627c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -130,6 +130,7 @@ import 
org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity;
 import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
 import org.apache.nifi.web.api.entity.VersionedFlowEntity;
 import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity;
+import org.apache.nifi.web.api.request.FlowMetricsRegistry;
 
 import java.util.Collection;
 import java.util.Date;
@@ -324,6 +325,14 @@ public interface NiFiServiceFacade {
     Collection<CollectorRegistry> generateFlowMetrics();
 
     /**
+     * Generate metrics for the flow and return selected registries
+     *
+     * @param includeRegistries Set of Flow Metrics Registries to be returned
+     * @return Collector Registries
+     */
+    Collection<CollectorRegistry> generateFlowMetrics(Set<FlowMetricsRegistry> 
includeRegistries);
+
+    /**
      * Updates the configuration for this controller.
      *
      * @param revision Revision to compare with current base revision
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 7bf6dd7..7b734ef 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -120,6 +120,7 @@ import org.apache.nifi.parameter.ParameterLookup;
 import org.apache.nifi.parameter.ParameterReferenceManager;
 import org.apache.nifi.parameter.StandardParameterContext;
 import org.apache.nifi.processor.VerifiableProcessor;
+import org.apache.nifi.prometheus.util.AbstractMetricsRegistry;
 import org.apache.nifi.prometheus.util.BulletinMetricsRegistry;
 import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry;
 import org.apache.nifi.prometheus.util.JvmMetricsRegistry;
@@ -308,6 +309,7 @@ import 
org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity;
 import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
 import org.apache.nifi.web.api.entity.VersionedFlowEntity;
 import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity;
+import org.apache.nifi.web.api.request.FlowMetricsRegistry;
 import org.apache.nifi.web.controller.ControllerFacade;
 import org.apache.nifi.web.dao.AccessPolicyDAO;
 import org.apache.nifi.web.dao.ConnectionDAO;
@@ -418,12 +420,19 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
     private AuthorizableLookup authorizableLookup;
 
     // Prometheus Metrics objects
-    private NiFiMetricsRegistry nifiMetricsRegistry = new 
NiFiMetricsRegistry();
-    private JvmMetricsRegistry jvmMetricsRegistry = new JvmMetricsRegistry();
-    private ConnectionAnalyticsMetricsRegistry 
connectionAnalyticsMetricsRegistry = new ConnectionAnalyticsMetricsRegistry();
-    private BulletinMetricsRegistry bulletinMetricsRegistry = new 
BulletinMetricsRegistry();
+    private final NiFiMetricsRegistry nifiMetricsRegistry = new 
NiFiMetricsRegistry();
+    private final JvmMetricsRegistry jvmMetricsRegistry = new 
JvmMetricsRegistry();
+    private final ConnectionAnalyticsMetricsRegistry 
connectionAnalyticsMetricsRegistry = new ConnectionAnalyticsMetricsRegistry();
+    private final BulletinMetricsRegistry bulletinMetricsRegistry = new 
BulletinMetricsRegistry();
+
+    private final Collection<AbstractMetricsRegistry> configuredRegistries = 
Arrays.asList(
+            nifiMetricsRegistry,
+            jvmMetricsRegistry,
+            connectionAnalyticsMetricsRegistry,
+            bulletinMetricsRegistry
+    );
 
-    public final Collection<CollectorRegistry> ALL_REGISTRIES = Arrays.asList(
+    private final Collection<CollectorRegistry> metricsRegistries = 
Arrays.asList(
             nifiMetricsRegistry.getRegistry(),
             jvmMetricsRegistry.getRegistry(),
             connectionAnalyticsMetricsRegistry.getRegistry(),
@@ -5654,7 +5663,22 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
                 );
             }
         }
-        return ALL_REGISTRIES;
+        return metricsRegistries;
+    }
+
+    @Override
+    public Collection<CollectorRegistry> generateFlowMetrics(final 
Set<FlowMetricsRegistry> includeRegistries) {
+        final Set<FlowMetricsRegistry> selectedRegistries = 
includeRegistries.isEmpty() ? new 
HashSet<>(Arrays.asList(FlowMetricsRegistry.values())) : includeRegistries;
+
+        final Set<Class<? extends AbstractMetricsRegistry>> registryClasses = 
selectedRegistries.stream()
+                .map(FlowMetricsRegistry::getRegistryClass)
+                .collect(Collectors.toSet());
+
+        generateFlowMetrics();
+        return configuredRegistries.stream()
+                .filter(configuredRegistry -> 
registryClasses.contains(configuredRegistry.getClass()))
+                .map(AbstractMetricsRegistry::getRegistry)
+                .collect(Collectors.toList());
     }
 
     @Override
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
index 35a7c81..79a31d0 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
@@ -111,8 +111,12 @@ import org.apache.nifi.web.api.entity.VersionedFlowEntity;
 import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity;
 import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataSetEntity;
 import org.apache.nifi.web.api.entity.VersionedFlowsEntity;
+import org.apache.nifi.web.api.metrics.TextFormatPrometheusMetricsWriter;
+import org.apache.nifi.web.api.metrics.PrometheusMetricsWriter;
 import org.apache.nifi.web.api.request.BulletinBoardPatternParameter;
 import org.apache.nifi.web.api.request.DateTimeParameter;
+import org.apache.nifi.web.api.request.FlowMetricsProducer;
+import org.apache.nifi.web.api.request.FlowMetricsRegistry;
 import org.apache.nifi.web.api.request.IntegerParameter;
 import org.apache.nifi.web.api.request.LongParameter;
 
@@ -131,13 +135,10 @@ import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.StreamingOutput;
-import java.io.BufferedWriter;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
 import java.text.Collator;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Comparator;
+import java.util.Collections;
 import java.util.Date;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -190,9 +191,8 @@ public class FlowResource extends ApplicationResource {
      * Populates the remaining fields in the specified process group.
      *
      * @param flow group
-     * @return group dto
      */
-    private ProcessGroupFlowDTO 
populateRemainingFlowContent(ProcessGroupFlowDTO flow) {
+    private void populateRemainingFlowContent(ProcessGroupFlowDTO flow) {
         FlowDTO flowStructure = flow.getFlow();
 
         // populate the remaining fields for the processors, connections, 
process group refs, remote process groups, and labels if appropriate
@@ -202,14 +202,12 @@ public class FlowResource extends ApplicationResource {
 
         // set the process group uri
         flow.setUri(generateResourceUri("flow", "process-groups", 
flow.getId()));
-
-        return flow;
     }
 
     /**
      * Populates the remaining content of the specified snippet.
      */
-    private FlowDTO populateRemainingFlowStructure(FlowDTO flowStructure) {
+    private void populateRemainingFlowStructure(FlowDTO flowStructure) {
         
processorResource.populateRemainingProcessorEntitiesContent(flowStructure.getProcessors());
         
connectionResource.populateRemainingConnectionEntitiesContent(flowStructure.getConnections());
         
inputPortResource.populateRemainingInputPortEntitiesContent(flowStructure.getInputPorts());
@@ -226,8 +224,6 @@ public class FlowResource extends ApplicationResource {
                 processGroup.setContents(null);
             }
         }
-
-        return flowStructure;
     }
 
     /**
@@ -371,8 +367,7 @@ public class FlowResource extends ApplicationResource {
     )
     public Response getFlow(
             @ApiParam(
-                    value = "The process group id.",
-                    required = false
+                    value = "The process group id."
             )
             @PathParam("id") final String groupId,
             @QueryParam("uiOnly") @DefaultValue("false") final boolean uiOnly) 
{
@@ -393,7 +388,6 @@ public class FlowResource extends ApplicationResource {
      * Retrieves the metrics of the entire flow.
      *
      * @return A flowMetricsEntity.
-     * @throws InterruptedException if interrupted
      */
     @GET
     @Consumes(MediaType.WILDCARD)
@@ -418,30 +412,36 @@ public class FlowResource extends ApplicationResource {
     public Response getFlowMetrics(
             @ApiParam(
                     value = "The producer for flow file metrics. Each producer 
may have its own output format.",
-                    required = true
+                    required = true,
+                    allowableValues = "prometheus"
+            )
+            @PathParam("producer") final String producer,
+            @ApiParam(
+                    value = "Set of included metrics registries",
+                    allowableValues = "NIFI,JVM,BULLETIN,CONNECTION"
+            )
+            @QueryParam("includedRegistries") final Set<FlowMetricsRegistry> 
includedRegistries,
+            @ApiParam(
+                    value = "Regular Expression Pattern to be applied against 
the sample name field"
             )
-            @PathParam("producer") final String producer) throws 
InterruptedException {
+            @QueryParam("sampleName") final String sampleName,
+            @ApiParam(
+                    value = "Regular Expression Pattern to be applied against 
the sample label value field"
+            )
+            @QueryParam("sampleLabelValue") final String sampleLabelValue
+    ) {
 
         authorizeFlow();
 
-        if ("prometheus".equalsIgnoreCase(producer)) {
-            // get this process group flow
-            final Collection<CollectorRegistry> allRegistries = 
serviceFacade.generateFlowMetrics();
-            // generate a streaming response
-            final StreamingOutput response = output -> {
-                Writer writer = new BufferedWriter(new 
OutputStreamWriter(output));
-                for (CollectorRegistry collectorRegistry : allRegistries) {
-                    TextFormat.write004(writer, 
collectorRegistry.metricFamilySamples());
-                    // flush the response
-                    output.flush();
-                }
-                writer.flush();
-                writer.close();
-            };
+        final Set<FlowMetricsRegistry> selectedRegistries = includedRegistries 
== null ? Collections.emptySet() : includedRegistries;
+        final Collection<CollectorRegistry> registries = 
serviceFacade.generateFlowMetrics(selectedRegistries);
 
-            return generateOkResponse(response)
-                    .type(MediaType.TEXT_PLAIN_TYPE)
-                    .build();
+        if 
(FlowMetricsProducer.PROMETHEUS.getProducer().equalsIgnoreCase(producer)) {
+            final StreamingOutput response = (outputStream -> {
+                final PrometheusMetricsWriter prometheusMetricsWriter = new 
TextFormatPrometheusMetricsWriter(sampleName, sampleLabelValue);
+                prometheusMetricsWriter.write(registries, outputStream);
+            });
+            return 
generateOkResponse(response).type(TextFormat.CONTENT_TYPE_004).build();
         } else {
             throw new ResourceNotFoundException("The specified producer is 
missing or invalid.");
         }
@@ -500,7 +500,6 @@ public class FlowResource extends ApplicationResource {
      * Retrieves all the of controller services in this NiFi.
      *
      * @return A controllerServicesEntity.
-     * @throws InterruptedException if interrupted
      */
     @GET
     @Consumes(MediaType.WILDCARD)
@@ -525,7 +524,7 @@ public class FlowResource extends ApplicationResource {
             @ApiParam(value = "The process group id.", required = true) 
@PathParam("id") String groupId,
             @ApiParam("Whether or not to include parent/ancestory process 
groups") @QueryParam("includeAncestorGroups") @DefaultValue("true") boolean 
includeAncestorGroups,
             @ApiParam("Whether or not to include descendant process groups") 
@QueryParam("includeDescendantGroups") @DefaultValue("false") boolean 
includeDescendantGroups
-            ) throws InterruptedException {
+            ) {
 
         authorizeFlow();
 
@@ -705,25 +704,19 @@ public class FlowResource extends ApplicationResource {
                 group.findAllProcessors().stream()
                         .filter(getProcessorFilter.get())
                         .filter(processor -> 
OperationAuthorizable.isOperationAuthorized(processor, authorizer, 
NiFiUserUtils.getNiFiUser()))
-                        .forEach(processor -> {
-                            componentIds.add(processor.getIdentifier());
-                        });
+                        .forEach(processor -> 
componentIds.add(processor.getIdentifier()));
 
                 // ensure authorized for each input port we will attempt to 
schedule
                 group.findAllInputPorts().stream()
                     .filter(getPortFilter.get())
                         .filter(inputPort -> 
OperationAuthorizable.isOperationAuthorized(inputPort, authorizer, 
NiFiUserUtils.getNiFiUser()))
-                        .forEach(inputPort -> {
-                            componentIds.add(inputPort.getIdentifier());
-                        });
+                        .forEach(inputPort -> 
componentIds.add(inputPort.getIdentifier()));
 
                 // ensure authorized for each output port we will attempt to 
schedule
                 group.findAllOutputPorts().stream()
                         .filter(getPortFilter.get())
                         .filter(outputPort -> 
OperationAuthorizable.isOperationAuthorized(outputPort, authorizer, 
NiFiUserUtils.getNiFiUser()))
-                        .forEach(outputPort -> {
-                            componentIds.add(outputPort.getIdentifier());
-                        });
+                        .forEach(outputPort -> 
componentIds.add(outputPort.getIdentifier()));
 
                 return componentIds;
             });
@@ -936,7 +929,6 @@ public class FlowResource extends ApplicationResource {
      *
      * @param value Search string
      * @return A searchResultsEntity
-     * @throws InterruptedException if interrupted
      */
     @GET
     @Consumes(MediaType.WILDCARD)
@@ -961,7 +953,7 @@ public class FlowResource extends ApplicationResource {
     public Response searchFlow(
             @QueryParam("q") @DefaultValue(StringUtils.EMPTY) String value,
             @QueryParam("a") @DefaultValue(StringUtils.EMPTY) String 
activeGroupId
-    ) throws InterruptedException {
+    ) {
         authorizeFlow();
 
         // query the controller
@@ -1022,7 +1014,6 @@ public class FlowResource extends ApplicationResource {
      * Retrieves the cluster summary for this NiFi.
      *
      * @return A clusterSummaryEntity.
-     * @throws InterruptedException if interrupted
      */
     @GET
     @Consumes(MediaType.WILDCARD)
@@ -1043,7 +1034,7 @@ public class FlowResource extends ApplicationResource {
                     @ApiResponse(code = 409, message = "The request was valid 
but NiFi was not in the appropriate state to process it. Retrying the same 
request later may be successful.")
             }
     )
-    public Response getClusterSummary() throws InterruptedException {
+    public Response getClusterSummary() {
 
         authorizeFlow();
 
@@ -1187,18 +1178,15 @@ public class FlowResource extends ApplicationResource {
     )
     public Response getProcessorTypes(
             @ApiParam(
-                value = "If specified, will only return types that are a 
member of this bundle group.",
-                required = false
+                value = "If specified, will only return types that are a 
member of this bundle group."
             )
             @QueryParam("bundleGroupFilter") String bundleGroupFilter,
             @ApiParam(
-                    value = "If specified, will only return types that are a 
member of this bundle artifact.",
-                    required = false
+                    value = "If specified, will only return types that are a 
member of this bundle artifact."
             )
             @QueryParam("bundleArtifactFilter") String bundleArtifactFilter,
             @ApiParam(
-                    value = "If specified, will only return types whose fully 
qualified classname matches.",
-                    required = false
+                    value = "If specified, will only return types whose fully 
qualified classname matches."
             )
             @QueryParam("type") String typeFilter) throws InterruptedException 
{
 
@@ -1245,38 +1233,31 @@ public class FlowResource extends ApplicationResource {
     )
     public Response getControllerServiceTypes(
             @ApiParam(
-                    value = "If specified, will only return controller 
services that are compatible with this type of service.",
-                    required = false
+                    value = "If specified, will only return controller 
services that are compatible with this type of service."
             )
             @QueryParam("serviceType") String serviceType,
             @ApiParam(
-                    value = "If serviceType specified, is the bundle group of 
the serviceType.",
-                    required = false
+                    value = "If serviceType specified, is the bundle group of 
the serviceType."
             )
             @QueryParam("serviceBundleGroup") String serviceBundleGroup,
             @ApiParam(
-                    value = "If serviceType specified, is the bundle artifact 
of the serviceType.",
-                    required = false
+                    value = "If serviceType specified, is the bundle artifact 
of the serviceType."
             )
             @QueryParam("serviceBundleArtifact") String serviceBundleArtifact,
             @ApiParam(
-                    value = "If serviceType specified, is the bundle version 
of the serviceType.",
-                    required = false
+                    value = "If serviceType specified, is the bundle version 
of the serviceType."
             )
             @QueryParam("serviceBundleVersion") String serviceBundleVersion,
             @ApiParam(
-                    value = "If specified, will only return types that are a 
member of this bundle group.",
-                    required = false
+                    value = "If specified, will only return types that are a 
member of this bundle group."
             )
             @QueryParam("bundleGroupFilter") String bundleGroupFilter,
             @ApiParam(
-                    value = "If specified, will only return types that are a 
member of this bundle artifact.",
-                    required = false
+                    value = "If specified, will only return types that are a 
member of this bundle artifact."
             )
             @QueryParam("bundleArtifactFilter") String bundleArtifactFilter,
             @ApiParam(
-                    value = "If specified, will only return types whose fully 
qualified classname matches.",
-                    required = false
+                    value = "If specified, will only return types whose fully 
qualified classname matches."
             )
             @QueryParam("typeFilter") String typeFilter) throws 
InterruptedException {
 
@@ -1329,18 +1310,15 @@ public class FlowResource extends ApplicationResource {
     )
     public Response getReportingTaskTypes(
             @ApiParam(
-                    value = "If specified, will only return types that are a 
member of this bundle group.",
-                    required = false
+                    value = "If specified, will only return types that are a 
member of this bundle group."
             )
             @QueryParam("bundleGroupFilter") String bundleGroupFilter,
             @ApiParam(
-                    value = "If specified, will only return types that are a 
member of this bundle artifact.",
-                    required = false
+                    value = "If specified, will only return types that are a 
member of this bundle artifact."
             )
             @QueryParam("bundleArtifactFilter") String bundleArtifactFilter,
             @ApiParam(
-                    value = "If specified, will only return types whose fully 
qualified classname matches.",
-                    required = false
+                    value = "If specified, will only return types whose fully 
qualified classname matches."
             )
             @QueryParam("type") String typeFilter) throws InterruptedException 
{
 
@@ -1520,12 +1498,7 @@ public class FlowResource extends ApplicationResource {
     }
 
     private SortedSet<BucketEntity> sortBuckets(final Set<BucketEntity> 
buckets) {
-        final SortedSet<BucketEntity> sortedBuckets = new TreeSet<>(new 
Comparator<BucketEntity>() {
-            @Override
-            public int compare(final BucketEntity entity1, final BucketEntity 
entity2) {
-                return Collator.getInstance().compare(getBucketName(entity1), 
getBucketName(entity2));
-            }
-        });
+        final SortedSet<BucketEntity> sortedBuckets = new TreeSet<>((entity1, 
entity2) -> Collator.getInstance().compare(getBucketName(entity1), 
getBucketName(entity2)));
 
         sortedBuckets.addAll(buckets);
         return sortedBuckets;
@@ -1573,12 +1546,7 @@ public class FlowResource extends ApplicationResource {
     }
 
     private SortedSet<VersionedFlowEntity> sortFlows(final 
Set<VersionedFlowEntity> versionedFlows) {
-        final SortedSet<VersionedFlowEntity> sortedFlows = new TreeSet<>(new 
Comparator<VersionedFlowEntity>() {
-            @Override
-            public int compare(final VersionedFlowEntity entity1, final 
VersionedFlowEntity entity2) {
-                return Collator.getInstance().compare(getFlowName(entity1), 
getFlowName(entity2));
-            }
-        });
+        final SortedSet<VersionedFlowEntity> sortedFlows = new 
TreeSet<>((entity1, entity2) -> 
Collator.getInstance().compare(getFlowName(entity1), getFlowName(entity2)));
 
         sortedFlows.addAll(versionedFlows);
         return sortedFlows;
@@ -1670,33 +1638,27 @@ public class FlowResource extends ApplicationResource {
     )
     public Response getBulletinBoard(
             @ApiParam(
-                    value = "Includes bulletins with an id after this value.",
-                    required = false
+                    value = "Includes bulletins with an id after this value."
             )
             @QueryParam("after") LongParameter after,
             @ApiParam(
-                    value = "Includes bulletins originating from this sources 
whose name match this regular expression.",
-                    required = false
+                    value = "Includes bulletins originating from this sources 
whose name match this regular expression."
             )
             @QueryParam("sourceName") BulletinBoardPatternParameter sourceName,
             @ApiParam(
-                    value = "Includes bulletins whose message that match this 
regular expression.",
-                    required = false
+                    value = "Includes bulletins whose message that match this 
regular expression."
             )
             @QueryParam("message") BulletinBoardPatternParameter message,
             @ApiParam(
-                    value = "Includes bulletins originating from this sources 
whose id match this regular expression.",
-                    required = false
+                    value = "Includes bulletins originating from this sources 
whose id match this regular expression."
             )
             @QueryParam("sourceId") BulletinBoardPatternParameter sourceId,
             @ApiParam(
-                    value = "Includes bulletins originating from this sources 
whose group id match this regular expression.",
-                    required = false
+                    value = "Includes bulletins originating from this sources 
whose group id match this regular expression."
             )
             @QueryParam("groupId") BulletinBoardPatternParameter groupId,
             @ApiParam(
-                    value = "The number of bulletins to limit the response 
to.",
-                    required = false
+                    value = "The number of bulletins to limit the response to."
             )
             @QueryParam("limit") IntegerParameter limit) throws 
InterruptedException {
 
@@ -1773,13 +1735,11 @@ public class FlowResource extends ApplicationResource {
     )
     public Response getProcessorStatus(
             @ApiParam(
-                    value = "Whether or not to include the breakdown per node. 
Optional, defaults to false",
-                    required = false
+                    value = "Whether or not to include the breakdown per node. 
Optional, defaults to false"
             )
             @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
             @ApiParam(
-                    value = "The id of the node where to get the status.",
-                    required = false
+                    value = "The id of the node where to get the status."
             )
             @QueryParam("clusterNodeId") String clusterNodeId,
             @ApiParam(
@@ -1846,13 +1806,11 @@ public class FlowResource extends ApplicationResource {
     )
     public Response getInputPortStatus(
             @ApiParam(
-                    value = "Whether or not to include the breakdown per node. 
Optional, defaults to false",
-                    required = false
+                    value = "Whether or not to include the breakdown per node. 
Optional, defaults to false"
             )
             @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
             @ApiParam(
-                    value = "The id of the node where to get the status.",
-                    required = false
+                    value = "The id of the node where to get the status."
             )
             @QueryParam("clusterNodeId") String clusterNodeId,
             @ApiParam(
@@ -1919,13 +1877,11 @@ public class FlowResource extends ApplicationResource {
     )
     public Response getOutputPortStatus(
             @ApiParam(
-                    value = "Whether or not to include the breakdown per node. 
Optional, defaults to false",
-                    required = false
+                    value = "Whether or not to include the breakdown per node. 
Optional, defaults to false"
             )
             @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
             @ApiParam(
-                    value = "The id of the node where to get the status.",
-                    required = false
+                    value = "The id of the node where to get the status."
             )
             @QueryParam("clusterNodeId") String clusterNodeId,
             @ApiParam(
@@ -1992,18 +1948,15 @@ public class FlowResource extends ApplicationResource {
     )
     public Response getRemoteProcessGroupStatus(
             @ApiParam(
-                    value = "Whether or not to include the breakdown per node. 
Optional, defaults to false",
-                    required = false
+                    value = "Whether or not to include the breakdown per node. 
Optional, defaults to false"
             )
             @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
             @ApiParam(
-                    value = "The id of the node where to get the status.",
-                    required = false
+                    value = "The id of the node where to get the status."
             )
             @QueryParam("clusterNodeId") String clusterNodeId,
             @ApiParam(
-                    value = "The remote process group id.",
-                    required = true
+                    value = "The remote process group id."
             )
             @PathParam("id") String id) throws InterruptedException {
 
@@ -2068,18 +2021,15 @@ public class FlowResource extends ApplicationResource {
     )
     public Response getProcessGroupStatus(
             @ApiParam(
-                    value = "Whether all descendant groups and the status of 
their content will be included. Optional, defaults to false",
-                    required = false
+                    value = "Whether all descendant groups and the status of 
their content will be included. Optional, defaults to false"
             )
             @QueryParam("recursive") @DefaultValue(RECURSIVE) Boolean 
recursive,
             @ApiParam(
-                    value = "Whether or not to include the breakdown per node. 
Optional, defaults to false",
-                    required = false
+                    value = "Whether or not to include the breakdown per node. 
Optional, defaults to false"
             )
             @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
             @ApiParam(
-                    value = "The id of the node where to get the status.",
-                    required = false
+                    value = "The id of the node where to get the status."
             )
             @QueryParam("clusterNodeId") String clusterNodeId,
             @ApiParam(
@@ -2146,13 +2096,11 @@ public class FlowResource extends ApplicationResource {
     )
     public Response getConnectionStatus(
             @ApiParam(
-                    value = "Whether or not to include the breakdown per node. 
Optional, defaults to false",
-                    required = false
+                    value = "Whether or not to include the breakdown per node. 
Optional, defaults to false"
             )
             @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
             @ApiParam(
-                    value = "The id of the node where to get the status.",
-                    required = false
+                    value = "The id of the node where to get the status."
             )
             @QueryParam("clusterNodeId") String clusterNodeId,
             @ApiParam(
@@ -2219,13 +2167,11 @@ public class FlowResource extends ApplicationResource {
     )
     public Response getConnectionStatistics(
             @ApiParam(
-                    value = "Whether or not to include the breakdown per node. 
Optional, defaults to false",
-                    required = false
+                    value = "Whether or not to include the breakdown per node. 
Optional, defaults to false"
             )
             @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
             @ApiParam(
-                    value = "The id of the node where to get the statistics.",
-                    required = false
+                    value = "The id of the node where to get the statistics."
             )
             @QueryParam("clusterNodeId") String clusterNodeId,
             @ApiParam(
@@ -2550,33 +2496,27 @@ public class FlowResource extends ApplicationResource {
             )
             @QueryParam("count") IntegerParameter count,
             @ApiParam(
-                    value = "The field to sort on.",
-                    required = false
+                    value = "The field to sort on."
             )
             @QueryParam("sortColumn") String sortColumn,
             @ApiParam(
-                    value = "The direction to sort.",
-                    required = false
+                    value = "The direction to sort."
             )
             @QueryParam("sortOrder") String sortOrder,
             @ApiParam(
-                    value = "Include actions after this date.",
-                    required = false
+                    value = "Include actions after this date."
             )
             @QueryParam("startDate") DateTimeParameter startDate,
             @ApiParam(
-                    value = "Include actions before this date.",
-                    required = false
+                    value = "Include actions before this date."
             )
             @QueryParam("endDate") DateTimeParameter endDate,
             @ApiParam(
-                    value = "Include actions performed by this user.",
-                    required = false
+                    value = "Include actions performed by this user."
             )
             @QueryParam("userIdentity") String userIdentity,
             @ApiParam(
-                    value = "Include actions on this component.",
-                    required = false
+                    value = "Include actions on this component."
             )
             @QueryParam("sourceId") String sourceId) {
 
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/FilteringMetricFamilySamplesEnumeration.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/FilteringMetricFamilySamplesEnumeration.java
new file mode 100644
index 0000000..f9f2e40
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/FilteringMetricFamilySamplesEnumeration.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.metrics;
+
+import io.prometheus.client.Collector;
+
+import java.util.Enumeration;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Enumeration wrapping Prometheus Collector Samples with filtering based on 
multiple patterns
+ */
+public class FilteringMetricFamilySamplesEnumeration implements 
Enumeration<Collector.MetricFamilySamples> {
+    private final Enumeration<Collector.MetricFamilySamples> 
metricFamilySamples;
+
+    private final Pattern sampleNamePattern;
+
+    private final Pattern sampleLabelValuePattern;
+
+    private Collector.MetricFamilySamples nextElement;
+
+    /**
+     * Filtering Metric Family Samples Enumeration with required properties
+     *
+     * @param metricFamilySamples Metric Family Samples to be filtered
+     * @param sampleNamePattern Pattern used to match against Sample.name 
field supports null values
+     * @param sampleLabelValuePattern Pattern used to matching against 
Sample.labelValues field supports null values
+     */
+    public FilteringMetricFamilySamplesEnumeration(
+            final Enumeration<Collector.MetricFamilySamples> 
metricFamilySamples,
+            final Pattern sampleNamePattern,
+            final Pattern sampleLabelValuePattern
+    ) {
+        this.metricFamilySamples = Objects.requireNonNull(metricFamilySamples);
+        this.sampleNamePattern = sampleNamePattern;
+        this.sampleLabelValuePattern = sampleLabelValuePattern;
+        setNextElement();
+    }
+
+    /**
+     * Has More Elements based on whether the next element is set from a 
previous operation
+     *
+     * @return More Elements status
+     */
+    @Override
+    public boolean hasMoreElements() {
+        return nextElement != null;
+    }
+
+    /**
+     * Get Next Element and set next available element before returning
+     *
+     * @return Next Element based on applied filters
+     */
+    @Override
+    public Collector.MetricFamilySamples nextElement() {
+        if (nextElement == null) {
+            throw new NoSuchElementException();
+        }
+        final Collector.MetricFamilySamples currentElement = nextElement;
+        setNextElement();
+        return currentElement;
+    }
+
+    /**
+     * Set Next Element based on Sample having matching properties
+     */
+    private void setNextElement() {
+        nextElement = null;
+        while (metricFamilySamples.hasMoreElements()) {
+            final Collector.MetricFamilySamples possibleNextElement = 
metricFamilySamples.nextElement();
+            possibleNextElement.samples.removeIf(this::isSampleNotMatched);
+            if (possibleNextElement.samples.size() == 0) {
+                continue;
+            }
+            nextElement = possibleNextElement;
+            break;
+        }
+    }
+
+    private boolean isSampleNotMatched(final 
Collector.MetricFamilySamples.Sample sample) {
+        boolean notMatched = false;
+
+        if (sampleNamePattern == null) {
+            notMatched = isSampleLabelValueNotMatched(sample);
+        } else if (sampleLabelValuePattern == null) {
+            notMatched = isSampleNameNotMatched(sample);
+        } else if (isSampleNameNotMatched(sample) && 
isSampleLabelValueNotMatched(sample)) {
+            notMatched = true;
+        }
+
+        return notMatched;
+    }
+
+    private boolean isSampleNameNotMatched(final 
Collector.MetricFamilySamples.Sample sample) {
+        final Matcher sampleNameMatcher = 
sampleNamePattern.matcher(sample.name);
+        return !sampleNameMatcher.matches();
+    }
+
+    private boolean isSampleLabelValueNotMatched(final 
Collector.MetricFamilySamples.Sample sample) {
+        boolean notMatched = true;
+
+        for (final String labelValue : sample.labelValues) {
+            final Matcher sampleLabelValueMatcher = 
sampleLabelValuePattern.matcher(labelValue);
+            if (sampleLabelValueMatcher.matches()) {
+                notMatched = false;
+                break;
+            }
+        }
+
+        return notMatched;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/PrometheusMetricsWriter.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/PrometheusMetricsWriter.java
new file mode 100644
index 0000000..cdc2926
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/PrometheusMetricsWriter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.metrics;
+
+import io.prometheus.client.CollectorRegistry;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collection;
+
+/**
+ * Prometheus Metrics Writer
+ */
+public interface PrometheusMetricsWriter {
+    /**
+     * Write collection of metrics registries to provided stream
+     *
+     * @param registries Collector Registries
+     * @param outputStream Output Stream
+     * @throws IOException Thrown on failure to write metrics
+     */
+    void write(Collection<CollectorRegistry> registries, OutputStream 
outputStream) throws IOException;
+}
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/TextFormatPrometheusMetricsWriter.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/TextFormatPrometheusMetricsWriter.java
new file mode 100644
index 0000000..9631fd4
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/TextFormatPrometheusMetricsWriter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.metrics;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.common.TextFormat;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.regex.Pattern;
+
+/**
+ * Prometheus Metrics Writer supporting Prometheus Text Version 0.0.4 with 
optional filtering
+ */
+public class TextFormatPrometheusMetricsWriter implements 
PrometheusMetricsWriter {
+    private final Pattern sampleNamePattern;
+
+    private final Pattern sampleLabelValuePattern;
+
+    private final boolean filteringDisabled;
+
+    public TextFormatPrometheusMetricsWriter(
+            final String sampleName,
+            final String sampleLabelValue
+    ) {
+        this.sampleNamePattern = StringUtils.isBlank(sampleName) ? null : 
Pattern.compile(sampleName);
+        this.sampleLabelValuePattern = StringUtils.isBlank(sampleLabelValue) ? 
null : Pattern.compile(sampleLabelValue);
+        this.filteringDisabled = StringUtils.isAllBlank(sampleName, 
sampleLabelValue);
+    }
+
+    @Override
+    public void write(final Collection<CollectorRegistry> registries, final 
OutputStream outputStream) throws IOException {
+        try (final Writer writer = new BufferedWriter(new 
OutputStreamWriter(outputStream))) {
+            for (final CollectorRegistry collectorRegistry : registries) {
+                final Enumeration<Collector.MetricFamilySamples> samples = 
getSamples(collectorRegistry);
+                TextFormat.write004(writer, samples);
+                writer.flush();
+            }
+        }
+    }
+
+    private Enumeration<Collector.MetricFamilySamples> getSamples(final 
CollectorRegistry registry) {
+        final Enumeration<Collector.MetricFamilySamples> samples = 
registry.metricFamilySamples();
+        return filteringDisabled ? samples : new 
FilteringMetricFamilySamplesEnumeration(
+                samples,
+                sampleNamePattern,
+                sampleLabelValuePattern
+        );
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/request/FlowMetricsProducer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/request/FlowMetricsProducer.java
new file mode 100644
index 0000000..e34c2cd
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/request/FlowMetricsProducer.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.request;
+
+/**
+ * Flow Metrics Producers supported
+ */
+public enum FlowMetricsProducer {
+    PROMETHEUS("prometheus");
+
+    private final String producer;
+
+    FlowMetricsProducer(final String producer) {
+        this.producer = producer;
+    }
+
+    public String getProducer() {
+        return producer;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/request/FlowMetricsRegistry.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/request/FlowMetricsRegistry.java
new file mode 100644
index 0000000..c827ae8
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/request/FlowMetricsRegistry.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.request;
+
+import org.apache.nifi.prometheus.util.AbstractMetricsRegistry;
+import org.apache.nifi.prometheus.util.BulletinMetricsRegistry;
+import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry;
+import org.apache.nifi.prometheus.util.JvmMetricsRegistry;
+import org.apache.nifi.prometheus.util.NiFiMetricsRegistry;
+
+/**
+ * Flow Metrics Registries
+ */
+public enum FlowMetricsRegistry {
+    NIFI("NIFI", NiFiMetricsRegistry.class),
+
+    JVM("JVM", JvmMetricsRegistry.class),
+
+    BULLETIN("BULLETIN", BulletinMetricsRegistry.class),
+
+    CONNECTION("CONNECTION", ConnectionAnalyticsMetricsRegistry.class);
+
+    private final String registry;
+
+    private final Class<? extends AbstractMetricsRegistry> registryClass;
+
+    FlowMetricsRegistry(final String registry, final Class<? extends 
AbstractMetricsRegistry> registryClass) {
+        this.registry = registry;
+        this.registryClass = registryClass;
+    }
+
+    public String getRegistry() {
+        return registry;
+    }
+
+    public Class<? extends AbstractMetricsRegistry> getRegistryClass() {
+        return registryClass;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestFlowResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestFlowResource.java
new file mode 100644
index 0000000..bc7fdf1
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestFlowResource.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.common.TextFormat;
+import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
+import org.apache.nifi.prometheus.util.JvmMetricsRegistry;
+import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
+import org.apache.nifi.web.NiFiServiceFacade;
+import org.apache.nifi.web.ResourceNotFoundException;
+import org.apache.nifi.web.api.request.FlowMetricsProducer;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anySet;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class TestFlowResource {
+    private static final String LABEL_VALUE = 
TestFlowResource.class.getSimpleName();
+
+    private static final String OTHER_LABEL_VALUE = 
JmxJvmMetrics.class.getSimpleName();
+
+    private static final String THREAD_COUNT_NAME = "nifi_jvm_thread_count";
+
+    private static final String HEAP_USAGE_NAME = "nifi_jvm_heap_usage";
+
+    private static final String HEAP_USED_NAME = "nifi_jvm_heap_used";
+
+    private static final String HEAP_STARTS_WITH_PATTERN = "nifi_jvm_heap.*";
+
+    private static final String THREAD_COUNT_LABEL = 
String.format("nifi_jvm_thread_count{instance=\"%s\"", LABEL_VALUE);
+
+    private static final String THREAD_COUNT_OTHER_LABEL = 
String.format("nifi_jvm_thread_count{instance=\"%s\"", OTHER_LABEL_VALUE);
+
+    @InjectMocks
+    private FlowResource resource = new FlowResource();
+
+    @Mock
+    private NiFiServiceFacade serviceFacade;
+
+    @Test
+    public void testGetFlowMetricsProducerInvalid() {
+        assertThrows(ResourceNotFoundException.class, () -> 
resource.getFlowMetrics(String.class.toString(), Collections.emptySet(), null, 
null));
+    }
+
+    @Test
+    public void testGetFlowMetricsPrometheus() throws IOException {
+        final List<CollectorRegistry> registries = getCollectorRegistries();
+        
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
+
+        final Response response = 
resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), 
Collections.emptySet(), null, null);
+
+        assertNotNull(response);
+        assertEquals(MediaType.valueOf(TextFormat.CONTENT_TYPE_004), 
response.getMediaType());
+
+        final String output = getResponseOutput(response);
+
+        assertTrue(output.contains(THREAD_COUNT_NAME), "Thread Count name not 
found");
+        assertTrue(output.contains(HEAP_USAGE_NAME), "Heap Usage name not 
found");
+    }
+
+    @Test
+    public void testGetFlowMetricsPrometheusSampleName() throws IOException {
+        final List<CollectorRegistry> registries = getCollectorRegistries();
+        
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
+
+        final Response response = 
resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), 
Collections.emptySet(), THREAD_COUNT_NAME, null);
+
+        assertNotNull(response);
+        assertEquals(MediaType.valueOf(TextFormat.CONTENT_TYPE_004), 
response.getMediaType());
+
+        final String output = getResponseOutput(response);
+
+        assertTrue(output.contains(THREAD_COUNT_NAME), "Thread Count name not 
found");
+        assertFalse(output.contains(HEAP_USAGE_NAME), "Heap Usage name not 
filtered");
+    }
+
+    @Test
+    public void testGetFlowMetricsPrometheusSampleNameStartsWithPattern() 
throws IOException {
+        final List<CollectorRegistry> registries = getCollectorRegistries();
+        
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
+
+        final Response response = 
resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), 
Collections.emptySet(), HEAP_STARTS_WITH_PATTERN, null);
+
+        assertNotNull(response);
+        assertEquals(MediaType.valueOf(TextFormat.CONTENT_TYPE_004), 
response.getMediaType());
+
+        final String output = getResponseOutput(response);
+
+        assertTrue(output.contains(HEAP_USAGE_NAME), "Heap Usage name not 
found");
+        assertTrue(output.contains(HEAP_USED_NAME), "Heap Used name not 
found");
+        assertFalse(output.contains(THREAD_COUNT_NAME), "Heap Usage name not 
filtered");
+    }
+
+    @Test
+    public void testGetFlowMetricsPrometheusSampleLabelValue() throws 
IOException {
+        final List<CollectorRegistry> registries = getCollectorRegistries();
+        
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
+
+        final Response response = 
resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), 
Collections.emptySet(), null, LABEL_VALUE);
+
+        assertNotNull(response);
+        assertEquals(MediaType.valueOf(TextFormat.CONTENT_TYPE_004), 
response.getMediaType());
+
+        final String output = getResponseOutput(response);
+
+        assertTrue(output.contains(LABEL_VALUE), "Label Value not found");
+        assertFalse(output.contains(OTHER_LABEL_VALUE), "Other Label Value not 
filtered");
+    }
+
+    @Test
+    public void testGetFlowMetricsPrometheusSampleNameAndSampleLabelValue() 
throws IOException {
+        final List<CollectorRegistry> registries = getCollectorRegistries();
+        
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
+
+        final Response response = 
resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(), 
Collections.emptySet(), THREAD_COUNT_NAME, LABEL_VALUE);
+
+        assertNotNull(response);
+        assertEquals(MediaType.valueOf(TextFormat.CONTENT_TYPE_004), 
response.getMediaType());
+
+        final String output = getResponseOutput(response);
+
+        assertTrue(output.contains(THREAD_COUNT_NAME), "Thread Count name not 
found");
+        assertTrue(output.contains(THREAD_COUNT_LABEL), "Thread Count with 
label not found");
+        assertTrue(output.contains(THREAD_COUNT_OTHER_LABEL), "Thread Count 
with other label not found");
+        assertTrue(output.contains(HEAP_USAGE_NAME), "Heap Usage name not 
found");
+    }
+
+    private String getResponseOutput(final Response response) throws 
IOException {
+        final StreamingOutput streamingOutput = (StreamingOutput) 
response.getEntity();
+        final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        streamingOutput.write(outputStream);
+        final byte[] outputBytes = outputStream.toByteArray();
+        return new String(outputBytes, StandardCharsets.UTF_8);
+    }
+
+    private List<CollectorRegistry> getCollectorRegistries() {
+        final JvmMetricsRegistry jvmMetricsRegistry = new JvmMetricsRegistry();
+        final CollectorRegistry jvmCollectorRegistry = 
PrometheusMetricsUtil.createJvmMetrics(jvmMetricsRegistry, 
JmxJvmMetrics.getInstance(), LABEL_VALUE);
+        final CollectorRegistry otherJvmCollectorRegistry = 
PrometheusMetricsUtil.createJvmMetrics(jvmMetricsRegistry, 
JmxJvmMetrics.getInstance(), OTHER_LABEL_VALUE);
+        return Arrays.asList(jvmCollectorRegistry, otherJvmCollectorRegistry);
+    }
+}
\ No newline at end of file

Reply via email to