This is an automated email from the ASF dual-hosted git repository.
jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new dd7131b NIFI-9435 Added registries and names include parameters to
Flow Metrics
dd7131b is described below
commit dd7131b257113581a8485b2c59184c7eecc96ce6
Author: exceptionfactory <[email protected]>
AuthorDate: Fri Dec 3 15:30:11 2021 -0600
NIFI-9435 Added registries and names include parameters to Flow Metrics
- Added optional includedRegistries query parameter to Flow Metrics
Resource method supporting one or more registries
- Added optional includedNames query parameter to Flow Metrics Resource
method supporting one or more metric family names
- Added sampleName and sampleLabelValue optional pattern parameters
- Added FilteringMetricFamilySamplesEnumeration to support streamed
filtering
- Added PrometheusMetricsWriter and TextFormat implementation
Signed-off-by: Joe Gresock <[email protected]>
This closes #5571.
---
.../org/apache/nifi/web/NiFiServiceFacade.java | 9 +
.../apache/nifi/web/StandardNiFiServiceFacade.java | 36 +++-
.../java/org/apache/nifi/web/api/FlowResource.java | 224 ++++++++-------------
.../FilteringMetricFamilySamplesEnumeration.java | 130 ++++++++++++
.../web/api/metrics/PrometheusMetricsWriter.java | 37 ++++
.../metrics/TextFormatPrometheusMetricsWriter.java | 71 +++++++
.../nifi/web/api/request/FlowMetricsProducer.java | 34 ++++
.../nifi/web/api/request/FlowMetricsRegistry.java | 53 +++++
.../org/apache/nifi/web/api/TestFlowResource.java | 177 ++++++++++++++++
9 files changed, 623 insertions(+), 148 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index 10f13cd..c53627c 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -130,6 +130,7 @@ import
org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity;
import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
import org.apache.nifi.web.api.entity.VersionedFlowEntity;
import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity;
+import org.apache.nifi.web.api.request.FlowMetricsRegistry;
import java.util.Collection;
import java.util.Date;
@@ -324,6 +325,14 @@ public interface NiFiServiceFacade {
Collection<CollectorRegistry> generateFlowMetrics();
/**
+ * Generate metrics for the flow and return selected registries
+ *
+ * @param includeRegistries Set of Flow Metrics Registries to be returned
+ * @return Collector Registries
+ */
+ Collection<CollectorRegistry> generateFlowMetrics(Set<FlowMetricsRegistry>
includeRegistries);
+
+ /**
* Updates the configuration for this controller.
*
* @param revision Revision to compare with current base revision
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 7bf6dd7..7b734ef 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -120,6 +120,7 @@ import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.parameter.ParameterReferenceManager;
import org.apache.nifi.parameter.StandardParameterContext;
import org.apache.nifi.processor.VerifiableProcessor;
+import org.apache.nifi.prometheus.util.AbstractMetricsRegistry;
import org.apache.nifi.prometheus.util.BulletinMetricsRegistry;
import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry;
import org.apache.nifi.prometheus.util.JvmMetricsRegistry;
@@ -308,6 +309,7 @@ import
org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity;
import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
import org.apache.nifi.web.api.entity.VersionedFlowEntity;
import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity;
+import org.apache.nifi.web.api.request.FlowMetricsRegistry;
import org.apache.nifi.web.controller.ControllerFacade;
import org.apache.nifi.web.dao.AccessPolicyDAO;
import org.apache.nifi.web.dao.ConnectionDAO;
@@ -418,12 +420,19 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
private AuthorizableLookup authorizableLookup;
// Prometheus Metrics objects
- private NiFiMetricsRegistry nifiMetricsRegistry = new
NiFiMetricsRegistry();
- private JvmMetricsRegistry jvmMetricsRegistry = new JvmMetricsRegistry();
- private ConnectionAnalyticsMetricsRegistry
connectionAnalyticsMetricsRegistry = new ConnectionAnalyticsMetricsRegistry();
- private BulletinMetricsRegistry bulletinMetricsRegistry = new
BulletinMetricsRegistry();
+ private final NiFiMetricsRegistry nifiMetricsRegistry = new
NiFiMetricsRegistry();
+ private final JvmMetricsRegistry jvmMetricsRegistry = new
JvmMetricsRegistry();
+ private final ConnectionAnalyticsMetricsRegistry
connectionAnalyticsMetricsRegistry = new ConnectionAnalyticsMetricsRegistry();
+ private final BulletinMetricsRegistry bulletinMetricsRegistry = new
BulletinMetricsRegistry();
+
+ private final Collection<AbstractMetricsRegistry> configuredRegistries =
Arrays.asList(
+ nifiMetricsRegistry,
+ jvmMetricsRegistry,
+ connectionAnalyticsMetricsRegistry,
+ bulletinMetricsRegistry
+ );
- public final Collection<CollectorRegistry> ALL_REGISTRIES = Arrays.asList(
+ private final Collection<CollectorRegistry> metricsRegistries =
Arrays.asList(
nifiMetricsRegistry.getRegistry(),
jvmMetricsRegistry.getRegistry(),
connectionAnalyticsMetricsRegistry.getRegistry(),
@@ -5654,7 +5663,22 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
);
}
}
- return ALL_REGISTRIES;
+ return metricsRegistries;
+ }
+
+ @Override
+ public Collection<CollectorRegistry> generateFlowMetrics(final
Set<FlowMetricsRegistry> includeRegistries) {
+ final Set<FlowMetricsRegistry> selectedRegistries =
includeRegistries.isEmpty() ? new
HashSet<>(Arrays.asList(FlowMetricsRegistry.values())) : includeRegistries;
+
+ final Set<Class<? extends AbstractMetricsRegistry>> registryClasses =
selectedRegistries.stream()
+ .map(FlowMetricsRegistry::getRegistryClass)
+ .collect(Collectors.toSet());
+
+ generateFlowMetrics();
+ return configuredRegistries.stream()
+ .filter(configuredRegistry ->
registryClasses.contains(configuredRegistry.getClass()))
+ .map(AbstractMetricsRegistry::getRegistry)
+ .collect(Collectors.toList());
}
@Override
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
index 35a7c81..79a31d0 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
@@ -111,8 +111,12 @@ import org.apache.nifi.web.api.entity.VersionedFlowEntity;
import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity;
import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataSetEntity;
import org.apache.nifi.web.api.entity.VersionedFlowsEntity;
+import org.apache.nifi.web.api.metrics.TextFormatPrometheusMetricsWriter;
+import org.apache.nifi.web.api.metrics.PrometheusMetricsWriter;
import org.apache.nifi.web.api.request.BulletinBoardPatternParameter;
import org.apache.nifi.web.api.request.DateTimeParameter;
+import org.apache.nifi.web.api.request.FlowMetricsProducer;
+import org.apache.nifi.web.api.request.FlowMetricsRegistry;
import org.apache.nifi.web.api.request.IntegerParameter;
import org.apache.nifi.web.api.request.LongParameter;
@@ -131,13 +135,10 @@ import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
-import java.io.BufferedWriter;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
import java.text.Collator;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Comparator;
+import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
@@ -190,9 +191,8 @@ public class FlowResource extends ApplicationResource {
* Populates the remaining fields in the specified process group.
*
* @param flow group
- * @return group dto
*/
- private ProcessGroupFlowDTO
populateRemainingFlowContent(ProcessGroupFlowDTO flow) {
+ private void populateRemainingFlowContent(ProcessGroupFlowDTO flow) {
FlowDTO flowStructure = flow.getFlow();
// populate the remaining fields for the processors, connections,
process group refs, remote process groups, and labels if appropriate
@@ -202,14 +202,12 @@ public class FlowResource extends ApplicationResource {
// set the process group uri
flow.setUri(generateResourceUri("flow", "process-groups",
flow.getId()));
-
- return flow;
}
/**
* Populates the remaining content of the specified snippet.
*/
- private FlowDTO populateRemainingFlowStructure(FlowDTO flowStructure) {
+ private void populateRemainingFlowStructure(FlowDTO flowStructure) {
processorResource.populateRemainingProcessorEntitiesContent(flowStructure.getProcessors());
connectionResource.populateRemainingConnectionEntitiesContent(flowStructure.getConnections());
inputPortResource.populateRemainingInputPortEntitiesContent(flowStructure.getInputPorts());
@@ -226,8 +224,6 @@ public class FlowResource extends ApplicationResource {
processGroup.setContents(null);
}
}
-
- return flowStructure;
}
/**
@@ -371,8 +367,7 @@ public class FlowResource extends ApplicationResource {
)
public Response getFlow(
@ApiParam(
- value = "The process group id.",
- required = false
+ value = "The process group id."
)
@PathParam("id") final String groupId,
@QueryParam("uiOnly") @DefaultValue("false") final boolean uiOnly)
{
@@ -393,7 +388,6 @@ public class FlowResource extends ApplicationResource {
* Retrieves the metrics of the entire flow.
*
* @return A flowMetricsEntity.
- * @throws InterruptedException if interrupted
*/
@GET
@Consumes(MediaType.WILDCARD)
@@ -418,30 +412,36 @@ public class FlowResource extends ApplicationResource {
public Response getFlowMetrics(
@ApiParam(
value = "The producer for flow file metrics. Each producer
may have its own output format.",
- required = true
+ required = true,
+ allowableValues = "prometheus"
+ )
+ @PathParam("producer") final String producer,
+ @ApiParam(
+ value = "Set of included metrics registries",
+ allowableValues = "NIFI,JVM,BULLETIN,CONNECTION"
+ )
+ @QueryParam("includedRegistries") final Set<FlowMetricsRegistry>
includedRegistries,
+ @ApiParam(
+ value = "Regular Expression Pattern to be applied against
the sample name field"
)
- @PathParam("producer") final String producer) throws
InterruptedException {
+ @QueryParam("sampleName") final String sampleName,
+ @ApiParam(
+ value = "Regular Expression Pattern to be applied against
the sample label value field"
+ )
+ @QueryParam("sampleLabelValue") final String sampleLabelValue
+ ) {
authorizeFlow();
- if ("prometheus".equalsIgnoreCase(producer)) {
- // get this process group flow
- final Collection<CollectorRegistry> allRegistries =
serviceFacade.generateFlowMetrics();
- // generate a streaming response
- final StreamingOutput response = output -> {
- Writer writer = new BufferedWriter(new
OutputStreamWriter(output));
- for (CollectorRegistry collectorRegistry : allRegistries) {
- TextFormat.write004(writer,
collectorRegistry.metricFamilySamples());
- // flush the response
- output.flush();
- }
- writer.flush();
- writer.close();
- };
+ final Set<FlowMetricsRegistry> selectedRegistries = includedRegistries
== null ? Collections.emptySet() : includedRegistries;
+ final Collection<CollectorRegistry> registries =
serviceFacade.generateFlowMetrics(selectedRegistries);
- return generateOkResponse(response)
- .type(MediaType.TEXT_PLAIN_TYPE)
- .build();
+ if
(FlowMetricsProducer.PROMETHEUS.getProducer().equalsIgnoreCase(producer)) {
+ final StreamingOutput response = (outputStream -> {
+ final PrometheusMetricsWriter prometheusMetricsWriter = new
TextFormatPrometheusMetricsWriter(sampleName, sampleLabelValue);
+ prometheusMetricsWriter.write(registries, outputStream);
+ });
+ return
generateOkResponse(response).type(TextFormat.CONTENT_TYPE_004).build();
} else {
throw new ResourceNotFoundException("The specified producer is
missing or invalid.");
}
@@ -500,7 +500,6 @@ public class FlowResource extends ApplicationResource {
* Retrieves all the of controller services in this NiFi.
*
* @return A controllerServicesEntity.
- * @throws InterruptedException if interrupted
*/
@GET
@Consumes(MediaType.WILDCARD)
@@ -525,7 +524,7 @@ public class FlowResource extends ApplicationResource {
@ApiParam(value = "The process group id.", required = true)
@PathParam("id") String groupId,
@ApiParam("Whether or not to include parent/ancestory process
groups") @QueryParam("includeAncestorGroups") @DefaultValue("true") boolean
includeAncestorGroups,
@ApiParam("Whether or not to include descendant process groups")
@QueryParam("includeDescendantGroups") @DefaultValue("false") boolean
includeDescendantGroups
- ) throws InterruptedException {
+ ) {
authorizeFlow();
@@ -705,25 +704,19 @@ public class FlowResource extends ApplicationResource {
group.findAllProcessors().stream()
.filter(getProcessorFilter.get())
.filter(processor ->
OperationAuthorizable.isOperationAuthorized(processor, authorizer,
NiFiUserUtils.getNiFiUser()))
- .forEach(processor -> {
- componentIds.add(processor.getIdentifier());
- });
+ .forEach(processor ->
componentIds.add(processor.getIdentifier()));
// ensure authorized for each input port we will attempt to
schedule
group.findAllInputPorts().stream()
.filter(getPortFilter.get())
.filter(inputPort ->
OperationAuthorizable.isOperationAuthorized(inputPort, authorizer,
NiFiUserUtils.getNiFiUser()))
- .forEach(inputPort -> {
- componentIds.add(inputPort.getIdentifier());
- });
+ .forEach(inputPort ->
componentIds.add(inputPort.getIdentifier()));
// ensure authorized for each output port we will attempt to
schedule
group.findAllOutputPorts().stream()
.filter(getPortFilter.get())
.filter(outputPort ->
OperationAuthorizable.isOperationAuthorized(outputPort, authorizer,
NiFiUserUtils.getNiFiUser()))
- .forEach(outputPort -> {
- componentIds.add(outputPort.getIdentifier());
- });
+ .forEach(outputPort ->
componentIds.add(outputPort.getIdentifier()));
return componentIds;
});
@@ -936,7 +929,6 @@ public class FlowResource extends ApplicationResource {
*
* @param value Search string
* @return A searchResultsEntity
- * @throws InterruptedException if interrupted
*/
@GET
@Consumes(MediaType.WILDCARD)
@@ -961,7 +953,7 @@ public class FlowResource extends ApplicationResource {
public Response searchFlow(
@QueryParam("q") @DefaultValue(StringUtils.EMPTY) String value,
@QueryParam("a") @DefaultValue(StringUtils.EMPTY) String
activeGroupId
- ) throws InterruptedException {
+ ) {
authorizeFlow();
// query the controller
@@ -1022,7 +1014,6 @@ public class FlowResource extends ApplicationResource {
* Retrieves the cluster summary for this NiFi.
*
* @return A clusterSummaryEntity.
- * @throws InterruptedException if interrupted
*/
@GET
@Consumes(MediaType.WILDCARD)
@@ -1043,7 +1034,7 @@ public class FlowResource extends ApplicationResource {
@ApiResponse(code = 409, message = "The request was valid
but NiFi was not in the appropriate state to process it. Retrying the same
request later may be successful.")
}
)
- public Response getClusterSummary() throws InterruptedException {
+ public Response getClusterSummary() {
authorizeFlow();
@@ -1187,18 +1178,15 @@ public class FlowResource extends ApplicationResource {
)
public Response getProcessorTypes(
@ApiParam(
- value = "If specified, will only return types that are a
member of this bundle group.",
- required = false
+ value = "If specified, will only return types that are a
member of this bundle group."
)
@QueryParam("bundleGroupFilter") String bundleGroupFilter,
@ApiParam(
- value = "If specified, will only return types that are a
member of this bundle artifact.",
- required = false
+ value = "If specified, will only return types that are a
member of this bundle artifact."
)
@QueryParam("bundleArtifactFilter") String bundleArtifactFilter,
@ApiParam(
- value = "If specified, will only return types whose fully
qualified classname matches.",
- required = false
+ value = "If specified, will only return types whose fully
qualified classname matches."
)
@QueryParam("type") String typeFilter) throws InterruptedException
{
@@ -1245,38 +1233,31 @@ public class FlowResource extends ApplicationResource {
)
public Response getControllerServiceTypes(
@ApiParam(
- value = "If specified, will only return controller
services that are compatible with this type of service.",
- required = false
+ value = "If specified, will only return controller
services that are compatible with this type of service."
)
@QueryParam("serviceType") String serviceType,
@ApiParam(
- value = "If serviceType specified, is the bundle group of
the serviceType.",
- required = false
+ value = "If serviceType specified, is the bundle group of
the serviceType."
)
@QueryParam("serviceBundleGroup") String serviceBundleGroup,
@ApiParam(
- value = "If serviceType specified, is the bundle artifact
of the serviceType.",
- required = false
+ value = "If serviceType specified, is the bundle artifact
of the serviceType."
)
@QueryParam("serviceBundleArtifact") String serviceBundleArtifact,
@ApiParam(
- value = "If serviceType specified, is the bundle version
of the serviceType.",
- required = false
+ value = "If serviceType specified, is the bundle version
of the serviceType."
)
@QueryParam("serviceBundleVersion") String serviceBundleVersion,
@ApiParam(
- value = "If specified, will only return types that are a
member of this bundle group.",
- required = false
+ value = "If specified, will only return types that are a
member of this bundle group."
)
@QueryParam("bundleGroupFilter") String bundleGroupFilter,
@ApiParam(
- value = "If specified, will only return types that are a
member of this bundle artifact.",
- required = false
+ value = "If specified, will only return types that are a
member of this bundle artifact."
)
@QueryParam("bundleArtifactFilter") String bundleArtifactFilter,
@ApiParam(
- value = "If specified, will only return types whose fully
qualified classname matches.",
- required = false
+ value = "If specified, will only return types whose fully
qualified classname matches."
)
@QueryParam("typeFilter") String typeFilter) throws
InterruptedException {
@@ -1329,18 +1310,15 @@ public class FlowResource extends ApplicationResource {
)
public Response getReportingTaskTypes(
@ApiParam(
- value = "If specified, will only return types that are a
member of this bundle group.",
- required = false
+ value = "If specified, will only return types that are a
member of this bundle group."
)
@QueryParam("bundleGroupFilter") String bundleGroupFilter,
@ApiParam(
- value = "If specified, will only return types that are a
member of this bundle artifact.",
- required = false
+ value = "If specified, will only return types that are a
member of this bundle artifact."
)
@QueryParam("bundleArtifactFilter") String bundleArtifactFilter,
@ApiParam(
- value = "If specified, will only return types whose fully
qualified classname matches.",
- required = false
+ value = "If specified, will only return types whose fully
qualified classname matches."
)
@QueryParam("type") String typeFilter) throws InterruptedException
{
@@ -1520,12 +1498,7 @@ public class FlowResource extends ApplicationResource {
}
private SortedSet<BucketEntity> sortBuckets(final Set<BucketEntity>
buckets) {
- final SortedSet<BucketEntity> sortedBuckets = new TreeSet<>(new
Comparator<BucketEntity>() {
- @Override
- public int compare(final BucketEntity entity1, final BucketEntity
entity2) {
- return Collator.getInstance().compare(getBucketName(entity1),
getBucketName(entity2));
- }
- });
+ final SortedSet<BucketEntity> sortedBuckets = new TreeSet<>((entity1,
entity2) -> Collator.getInstance().compare(getBucketName(entity1),
getBucketName(entity2)));
sortedBuckets.addAll(buckets);
return sortedBuckets;
@@ -1573,12 +1546,7 @@ public class FlowResource extends ApplicationResource {
}
private SortedSet<VersionedFlowEntity> sortFlows(final
Set<VersionedFlowEntity> versionedFlows) {
- final SortedSet<VersionedFlowEntity> sortedFlows = new TreeSet<>(new
Comparator<VersionedFlowEntity>() {
- @Override
- public int compare(final VersionedFlowEntity entity1, final
VersionedFlowEntity entity2) {
- return Collator.getInstance().compare(getFlowName(entity1),
getFlowName(entity2));
- }
- });
+ final SortedSet<VersionedFlowEntity> sortedFlows = new
TreeSet<>((entity1, entity2) ->
Collator.getInstance().compare(getFlowName(entity1), getFlowName(entity2)));
sortedFlows.addAll(versionedFlows);
return sortedFlows;
@@ -1670,33 +1638,27 @@ public class FlowResource extends ApplicationResource {
)
public Response getBulletinBoard(
@ApiParam(
- value = "Includes bulletins with an id after this value.",
- required = false
+ value = "Includes bulletins with an id after this value."
)
@QueryParam("after") LongParameter after,
@ApiParam(
- value = "Includes bulletins originating from this sources
whose name match this regular expression.",
- required = false
+ value = "Includes bulletins originating from this sources
whose name match this regular expression."
)
@QueryParam("sourceName") BulletinBoardPatternParameter sourceName,
@ApiParam(
- value = "Includes bulletins whose message that match this
regular expression.",
- required = false
+ value = "Includes bulletins whose message that match this
regular expression."
)
@QueryParam("message") BulletinBoardPatternParameter message,
@ApiParam(
- value = "Includes bulletins originating from this sources
whose id match this regular expression.",
- required = false
+ value = "Includes bulletins originating from this sources
whose id match this regular expression."
)
@QueryParam("sourceId") BulletinBoardPatternParameter sourceId,
@ApiParam(
- value = "Includes bulletins originating from this sources
whose group id match this regular expression.",
- required = false
+ value = "Includes bulletins originating from this sources
whose group id match this regular expression."
)
@QueryParam("groupId") BulletinBoardPatternParameter groupId,
@ApiParam(
- value = "The number of bulletins to limit the response
to.",
- required = false
+ value = "The number of bulletins to limit the response to."
)
@QueryParam("limit") IntegerParameter limit) throws
InterruptedException {
@@ -1773,13 +1735,11 @@ public class FlowResource extends ApplicationResource {
)
public Response getProcessorStatus(
@ApiParam(
- value = "Whether or not to include the breakdown per node.
Optional, defaults to false",
- required = false
+ value = "Whether or not to include the breakdown per node.
Optional, defaults to false"
)
@QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
@ApiParam(
- value = "The id of the node where to get the status.",
- required = false
+ value = "The id of the node where to get the status."
)
@QueryParam("clusterNodeId") String clusterNodeId,
@ApiParam(
@@ -1846,13 +1806,11 @@ public class FlowResource extends ApplicationResource {
)
public Response getInputPortStatus(
@ApiParam(
- value = "Whether or not to include the breakdown per node.
Optional, defaults to false",
- required = false
+ value = "Whether or not to include the breakdown per node.
Optional, defaults to false"
)
@QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
@ApiParam(
- value = "The id of the node where to get the status.",
- required = false
+ value = "The id of the node where to get the status."
)
@QueryParam("clusterNodeId") String clusterNodeId,
@ApiParam(
@@ -1919,13 +1877,11 @@ public class FlowResource extends ApplicationResource {
)
public Response getOutputPortStatus(
@ApiParam(
- value = "Whether or not to include the breakdown per node.
Optional, defaults to false",
- required = false
+ value = "Whether or not to include the breakdown per node.
Optional, defaults to false"
)
@QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
@ApiParam(
- value = "The id of the node where to get the status.",
- required = false
+ value = "The id of the node where to get the status."
)
@QueryParam("clusterNodeId") String clusterNodeId,
@ApiParam(
@@ -1992,18 +1948,15 @@ public class FlowResource extends ApplicationResource {
)
public Response getRemoteProcessGroupStatus(
@ApiParam(
- value = "Whether or not to include the breakdown per node.
Optional, defaults to false",
- required = false
+ value = "Whether or not to include the breakdown per node.
Optional, defaults to false"
)
@QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
@ApiParam(
- value = "The id of the node where to get the status.",
- required = false
+ value = "The id of the node where to get the status."
)
@QueryParam("clusterNodeId") String clusterNodeId,
@ApiParam(
- value = "The remote process group id.",
- required = true
+ value = "The remote process group id."
)
@PathParam("id") String id) throws InterruptedException {
@@ -2068,18 +2021,15 @@ public class FlowResource extends ApplicationResource {
)
public Response getProcessGroupStatus(
@ApiParam(
- value = "Whether all descendant groups and the status of
their content will be included. Optional, defaults to false",
- required = false
+ value = "Whether all descendant groups and the status of
their content will be included. Optional, defaults to false"
)
@QueryParam("recursive") @DefaultValue(RECURSIVE) Boolean
recursive,
@ApiParam(
- value = "Whether or not to include the breakdown per node.
Optional, defaults to false",
- required = false
+ value = "Whether or not to include the breakdown per node.
Optional, defaults to false"
)
@QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
@ApiParam(
- value = "The id of the node where to get the status.",
- required = false
+ value = "The id of the node where to get the status."
)
@QueryParam("clusterNodeId") String clusterNodeId,
@ApiParam(
@@ -2146,13 +2096,11 @@ public class FlowResource extends ApplicationResource {
)
public Response getConnectionStatus(
@ApiParam(
- value = "Whether or not to include the breakdown per node.
Optional, defaults to false",
- required = false
+ value = "Whether or not to include the breakdown per node.
Optional, defaults to false"
)
@QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
@ApiParam(
- value = "The id of the node where to get the status.",
- required = false
+ value = "The id of the node where to get the status."
)
@QueryParam("clusterNodeId") String clusterNodeId,
@ApiParam(
@@ -2219,13 +2167,11 @@ public class FlowResource extends ApplicationResource {
)
public Response getConnectionStatistics(
@ApiParam(
- value = "Whether or not to include the breakdown per node.
Optional, defaults to false",
- required = false
+ value = "Whether or not to include the breakdown per node.
Optional, defaults to false"
)
@QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
@ApiParam(
- value = "The id of the node where to get the statistics.",
- required = false
+ value = "The id of the node where to get the statistics."
)
@QueryParam("clusterNodeId") String clusterNodeId,
@ApiParam(
@@ -2550,33 +2496,27 @@ public class FlowResource extends ApplicationResource {
)
@QueryParam("count") IntegerParameter count,
@ApiParam(
- value = "The field to sort on.",
- required = false
+ value = "The field to sort on."
)
@QueryParam("sortColumn") String sortColumn,
@ApiParam(
- value = "The direction to sort.",
- required = false
+ value = "The direction to sort."
)
@QueryParam("sortOrder") String sortOrder,
@ApiParam(
- value = "Include actions after this date.",
- required = false
+ value = "Include actions after this date."
)
@QueryParam("startDate") DateTimeParameter startDate,
@ApiParam(
- value = "Include actions before this date.",
- required = false
+ value = "Include actions before this date."
)
@QueryParam("endDate") DateTimeParameter endDate,
@ApiParam(
- value = "Include actions performed by this user.",
- required = false
+ value = "Include actions performed by this user."
)
@QueryParam("userIdentity") String userIdentity,
@ApiParam(
- value = "Include actions on this component.",
- required = false
+ value = "Include actions on this component."
)
@QueryParam("sourceId") String sourceId) {
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/FilteringMetricFamilySamplesEnumeration.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/FilteringMetricFamilySamplesEnumeration.java
new file mode 100644
index 0000000..f9f2e40
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/FilteringMetricFamilySamplesEnumeration.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.metrics;
+
+import io.prometheus.client.Collector;
+
+import java.util.Enumeration;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Enumeration wrapping Prometheus Collector Samples with filtering based on
multiple patterns
+ */
+public class FilteringMetricFamilySamplesEnumeration implements
Enumeration<Collector.MetricFamilySamples> {
+ private final Enumeration<Collector.MetricFamilySamples>
metricFamilySamples;
+
+ private final Pattern sampleNamePattern;
+
+ private final Pattern sampleLabelValuePattern;
+
+ private Collector.MetricFamilySamples nextElement;
+
+ /**
+ * Filtering Metric Family Samples Enumeration with required properties
+ *
+ * @param metricFamilySamples Metric Family Samples to be filtered
+ * @param sampleNamePattern Pattern used to match against Sample.name
field supports null values
+ * @param sampleLabelValuePattern Pattern used to matching against
Sample.labelValues field supports null values
+ */
+ public FilteringMetricFamilySamplesEnumeration(
+ final Enumeration<Collector.MetricFamilySamples>
metricFamilySamples,
+ final Pattern sampleNamePattern,
+ final Pattern sampleLabelValuePattern
+ ) {
+ this.metricFamilySamples = Objects.requireNonNull(metricFamilySamples);
+ this.sampleNamePattern = sampleNamePattern;
+ this.sampleLabelValuePattern = sampleLabelValuePattern;
+ setNextElement();
+ }
+
+ /**
+ * Has More Elements based on whether the next element is set from a
previous operation
+ *
+ * @return More Elements status
+ */
+ @Override
+ public boolean hasMoreElements() {
+ return nextElement != null;
+ }
+
+ /**
+ * Get Next Element and set next available element before returning
+ *
+ * @return Next Element based on applied filters
+ */
+ @Override
+ public Collector.MetricFamilySamples nextElement() {
+ if (nextElement == null) {
+ throw new NoSuchElementException();
+ }
+ final Collector.MetricFamilySamples currentElement = nextElement;
+ setNextElement();
+ return currentElement;
+ }
+
+ /**
+ * Set Next Element based on Sample having matching properties
+ */
+ private void setNextElement() {
+ nextElement = null;
+ while (metricFamilySamples.hasMoreElements()) {
+ final Collector.MetricFamilySamples possibleNextElement =
metricFamilySamples.nextElement();
+ possibleNextElement.samples.removeIf(this::isSampleNotMatched);
+ if (possibleNextElement.samples.size() == 0) {
+ continue;
+ }
+ nextElement = possibleNextElement;
+ break;
+ }
+ }
+
+ private boolean isSampleNotMatched(final
Collector.MetricFamilySamples.Sample sample) {
+ boolean notMatched = false;
+
+ if (sampleNamePattern == null) {
+ notMatched = isSampleLabelValueNotMatched(sample);
+ } else if (sampleLabelValuePattern == null) {
+ notMatched = isSampleNameNotMatched(sample);
+ } else if (isSampleNameNotMatched(sample) &&
isSampleLabelValueNotMatched(sample)) {
+ notMatched = true;
+ }
+
+ return notMatched;
+ }
+
+ private boolean isSampleNameNotMatched(final
Collector.MetricFamilySamples.Sample sample) {
+ final Matcher sampleNameMatcher =
sampleNamePattern.matcher(sample.name);
+ return !sampleNameMatcher.matches();
+ }
+
+ private boolean isSampleLabelValueNotMatched(final
Collector.MetricFamilySamples.Sample sample) {
+ boolean notMatched = true;
+
+ for (final String labelValue : sample.labelValues) {
+ final Matcher sampleLabelValueMatcher =
sampleLabelValuePattern.matcher(labelValue);
+ if (sampleLabelValueMatcher.matches()) {
+ notMatched = false;
+ break;
+ }
+ }
+
+ return notMatched;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/PrometheusMetricsWriter.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/PrometheusMetricsWriter.java
new file mode 100644
index 0000000..cdc2926
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/PrometheusMetricsWriter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.metrics;
+
+import io.prometheus.client.CollectorRegistry;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collection;
+
+/**
+ * Prometheus Metrics Writer
+ */
+public interface PrometheusMetricsWriter {
+ /**
+ * Write collection of metrics registries to provided stream
+ *
+ * @param registries Collector Registries
+ * @param outputStream Output Stream
+ * @throws IOException Thrown on failure to write metrics
+ */
+ void write(Collection<CollectorRegistry> registries, OutputStream
outputStream) throws IOException;
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/TextFormatPrometheusMetricsWriter.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/TextFormatPrometheusMetricsWriter.java
new file mode 100644
index 0000000..9631fd4
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/metrics/TextFormatPrometheusMetricsWriter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.metrics;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.common.TextFormat;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.regex.Pattern;
+
+/**
+ * Prometheus Metrics Writer supporting Prometheus Text Version 0.0.4 with
optional filtering
+ */
+public class TextFormatPrometheusMetricsWriter implements
PrometheusMetricsWriter {
+ private final Pattern sampleNamePattern;
+
+ private final Pattern sampleLabelValuePattern;
+
+ private final boolean filteringDisabled;
+
+ public TextFormatPrometheusMetricsWriter(
+ final String sampleName,
+ final String sampleLabelValue
+ ) {
+ this.sampleNamePattern = StringUtils.isBlank(sampleName) ? null :
Pattern.compile(sampleName);
+ this.sampleLabelValuePattern = StringUtils.isBlank(sampleLabelValue) ?
null : Pattern.compile(sampleLabelValue);
+ this.filteringDisabled = StringUtils.isAllBlank(sampleName,
sampleLabelValue);
+ }
+
+ @Override
+ public void write(final Collection<CollectorRegistry> registries, final
OutputStream outputStream) throws IOException {
+ try (final Writer writer = new BufferedWriter(new
OutputStreamWriter(outputStream))) {
+ for (final CollectorRegistry collectorRegistry : registries) {
+ final Enumeration<Collector.MetricFamilySamples> samples =
getSamples(collectorRegistry);
+ TextFormat.write004(writer, samples);
+ writer.flush();
+ }
+ }
+ }
+
+ private Enumeration<Collector.MetricFamilySamples> getSamples(final
CollectorRegistry registry) {
+ final Enumeration<Collector.MetricFamilySamples> samples =
registry.metricFamilySamples();
+ return filteringDisabled ? samples : new
FilteringMetricFamilySamplesEnumeration(
+ samples,
+ sampleNamePattern,
+ sampleLabelValuePattern
+ );
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/request/FlowMetricsProducer.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/request/FlowMetricsProducer.java
new file mode 100644
index 0000000..e34c2cd
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/request/FlowMetricsProducer.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.request;
+
+/**
+ * Flow Metrics Producers supported
+ */
+public enum FlowMetricsProducer {
+ PROMETHEUS("prometheus");
+
+ private final String producer;
+
+ FlowMetricsProducer(final String producer) {
+ this.producer = producer;
+ }
+
+ public String getProducer() {
+ return producer;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/request/FlowMetricsRegistry.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/request/FlowMetricsRegistry.java
new file mode 100644
index 0000000..c827ae8
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/request/FlowMetricsRegistry.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.request;
+
+import org.apache.nifi.prometheus.util.AbstractMetricsRegistry;
+import org.apache.nifi.prometheus.util.BulletinMetricsRegistry;
+import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry;
+import org.apache.nifi.prometheus.util.JvmMetricsRegistry;
+import org.apache.nifi.prometheus.util.NiFiMetricsRegistry;
+
+/**
+ * Flow Metrics Registries
+ */
+public enum FlowMetricsRegistry {
+ NIFI("NIFI", NiFiMetricsRegistry.class),
+
+ JVM("JVM", JvmMetricsRegistry.class),
+
+ BULLETIN("BULLETIN", BulletinMetricsRegistry.class),
+
+ CONNECTION("CONNECTION", ConnectionAnalyticsMetricsRegistry.class);
+
+ private final String registry;
+
+ private final Class<? extends AbstractMetricsRegistry> registryClass;
+
+ FlowMetricsRegistry(final String registry, final Class<? extends
AbstractMetricsRegistry> registryClass) {
+ this.registry = registry;
+ this.registryClass = registryClass;
+ }
+
+ public String getRegistry() {
+ return registry;
+ }
+
+ public Class<? extends AbstractMetricsRegistry> getRegistryClass() {
+ return registryClass;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestFlowResource.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestFlowResource.java
new file mode 100644
index 0000000..bc7fdf1
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/api/TestFlowResource.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.common.TextFormat;
+import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
+import org.apache.nifi.prometheus.util.JvmMetricsRegistry;
+import org.apache.nifi.prometheus.util.PrometheusMetricsUtil;
+import org.apache.nifi.web.NiFiServiceFacade;
+import org.apache.nifi.web.ResourceNotFoundException;
+import org.apache.nifi.web.api.request.FlowMetricsProducer;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anySet;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class TestFlowResource {
+ private static final String LABEL_VALUE =
TestFlowResource.class.getSimpleName();
+
+ private static final String OTHER_LABEL_VALUE =
JmxJvmMetrics.class.getSimpleName();
+
+ private static final String THREAD_COUNT_NAME = "nifi_jvm_thread_count";
+
+ private static final String HEAP_USAGE_NAME = "nifi_jvm_heap_usage";
+
+ private static final String HEAP_USED_NAME = "nifi_jvm_heap_used";
+
+ private static final String HEAP_STARTS_WITH_PATTERN = "nifi_jvm_heap.*";
+
+ private static final String THREAD_COUNT_LABEL =
String.format("nifi_jvm_thread_count{instance=\"%s\"", LABEL_VALUE);
+
+ private static final String THREAD_COUNT_OTHER_LABEL =
String.format("nifi_jvm_thread_count{instance=\"%s\"", OTHER_LABEL_VALUE);
+
+ @InjectMocks
+ private FlowResource resource = new FlowResource();
+
+ @Mock
+ private NiFiServiceFacade serviceFacade;
+
+ @Test
+ public void testGetFlowMetricsProducerInvalid() {
+ assertThrows(ResourceNotFoundException.class, () ->
resource.getFlowMetrics(String.class.toString(), Collections.emptySet(), null,
null));
+ }
+
+ @Test
+ public void testGetFlowMetricsPrometheus() throws IOException {
+ final List<CollectorRegistry> registries = getCollectorRegistries();
+
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
+
+ final Response response =
resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(),
Collections.emptySet(), null, null);
+
+ assertNotNull(response);
+ assertEquals(MediaType.valueOf(TextFormat.CONTENT_TYPE_004),
response.getMediaType());
+
+ final String output = getResponseOutput(response);
+
+ assertTrue(output.contains(THREAD_COUNT_NAME), "Thread Count name not
found");
+ assertTrue(output.contains(HEAP_USAGE_NAME), "Heap Usage name not
found");
+ }
+
+ @Test
+ public void testGetFlowMetricsPrometheusSampleName() throws IOException {
+ final List<CollectorRegistry> registries = getCollectorRegistries();
+
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
+
+ final Response response =
resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(),
Collections.emptySet(), THREAD_COUNT_NAME, null);
+
+ assertNotNull(response);
+ assertEquals(MediaType.valueOf(TextFormat.CONTENT_TYPE_004),
response.getMediaType());
+
+ final String output = getResponseOutput(response);
+
+ assertTrue(output.contains(THREAD_COUNT_NAME), "Thread Count name not
found");
+ assertFalse(output.contains(HEAP_USAGE_NAME), "Heap Usage name not
filtered");
+ }
+
+ @Test
+ public void testGetFlowMetricsPrometheusSampleNameStartsWithPattern()
throws IOException {
+ final List<CollectorRegistry> registries = getCollectorRegistries();
+
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
+
+ final Response response =
resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(),
Collections.emptySet(), HEAP_STARTS_WITH_PATTERN, null);
+
+ assertNotNull(response);
+ assertEquals(MediaType.valueOf(TextFormat.CONTENT_TYPE_004),
response.getMediaType());
+
+ final String output = getResponseOutput(response);
+
+ assertTrue(output.contains(HEAP_USAGE_NAME), "Heap Usage name not
found");
+ assertTrue(output.contains(HEAP_USED_NAME), "Heap Used name not
found");
+ assertFalse(output.contains(THREAD_COUNT_NAME), "Heap Usage name not
filtered");
+ }
+
+ @Test
+ public void testGetFlowMetricsPrometheusSampleLabelValue() throws
IOException {
+ final List<CollectorRegistry> registries = getCollectorRegistries();
+
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
+
+ final Response response =
resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(),
Collections.emptySet(), null, LABEL_VALUE);
+
+ assertNotNull(response);
+ assertEquals(MediaType.valueOf(TextFormat.CONTENT_TYPE_004),
response.getMediaType());
+
+ final String output = getResponseOutput(response);
+
+ assertTrue(output.contains(LABEL_VALUE), "Label Value not found");
+ assertFalse(output.contains(OTHER_LABEL_VALUE), "Other Label Value not
filtered");
+ }
+
+ @Test
+ public void testGetFlowMetricsPrometheusSampleNameAndSampleLabelValue()
throws IOException {
+ final List<CollectorRegistry> registries = getCollectorRegistries();
+
when(serviceFacade.generateFlowMetrics(anySet())).thenReturn(registries);
+
+ final Response response =
resource.getFlowMetrics(FlowMetricsProducer.PROMETHEUS.getProducer(),
Collections.emptySet(), THREAD_COUNT_NAME, LABEL_VALUE);
+
+ assertNotNull(response);
+ assertEquals(MediaType.valueOf(TextFormat.CONTENT_TYPE_004),
response.getMediaType());
+
+ final String output = getResponseOutput(response);
+
+ assertTrue(output.contains(THREAD_COUNT_NAME), "Thread Count name not
found");
+ assertTrue(output.contains(THREAD_COUNT_LABEL), "Thread Count with
label not found");
+ assertTrue(output.contains(THREAD_COUNT_OTHER_LABEL), "Thread Count
with other label not found");
+ assertTrue(output.contains(HEAP_USAGE_NAME), "Heap Usage name not
found");
+ }
+
+ private String getResponseOutput(final Response response) throws
IOException {
+ final StreamingOutput streamingOutput = (StreamingOutput)
response.getEntity();
+ final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ streamingOutput.write(outputStream);
+ final byte[] outputBytes = outputStream.toByteArray();
+ return new String(outputBytes, StandardCharsets.UTF_8);
+ }
+
+ private List<CollectorRegistry> getCollectorRegistries() {
+ final JvmMetricsRegistry jvmMetricsRegistry = new JvmMetricsRegistry();
+ final CollectorRegistry jvmCollectorRegistry =
PrometheusMetricsUtil.createJvmMetrics(jvmMetricsRegistry,
JmxJvmMetrics.getInstance(), LABEL_VALUE);
+ final CollectorRegistry otherJvmCollectorRegistry =
PrometheusMetricsUtil.createJvmMetrics(jvmMetricsRegistry,
JmxJvmMetrics.getInstance(), OTHER_LABEL_VALUE);
+ return Arrays.asList(jvmCollectorRegistry, otherJvmCollectorRegistry);
+ }
+}
\ No newline at end of file