Repository: nifi Updated Branches: refs/heads/master d09145e3e -> e8a8c19db
http://git-wip-us.apache.org/repos/asf/nifi/blob/e8a8c19d/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java index d623b6f..526b5d5 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java @@ -60,6 +60,8 @@ import org.apache.nifi.remote.TransferDirection; + "However, all process groups are recursively searched for matching components, regardless of whether the process group matches the component filters.") public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTask { + static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; + static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder() .name("Platform") .description("The value to use for the platform field in each status record.") @@ -68,7 +70,6 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa .defaultValue("nifi") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - static final PropertyDescriptor COMPONENT_TYPE_FILTER_REGEX = new PropertyDescriptor.Builder() .name("Component Type Filter Regex") .description("A regex specifying which component types to report. Any component type matching this regex will be included. " @@ -78,7 +79,6 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa .defaultValue("(Processor|ProcessGroup|RemoteProcessGroup|RootProcessGroup|Connection|InputPort|OutputPort)") .addValidator(StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true)) .build(); - static final PropertyDescriptor COMPONENT_NAME_FILTER_REGEX = new PropertyDescriptor.Builder() .name("Component Name Filter Regex") .description("A regex specifying which component names to report. Any component name matching this regex will be included.") @@ -197,7 +197,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa * The component name * @return Whether the component matches both filters */ - private boolean componentMatchesFilters(final String componentType, final String componentName) { + boolean componentMatchesFilters(final String componentType, final String componentName) { return componentTypeFilter.matcher(componentType).matches() && componentNameFilter.matcher(componentName).matches(); } @@ -221,7 +221,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa * @param parentId * The parent's component id */ - private void serializeProcessGroupStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, + void serializeProcessGroupStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final ProcessGroupStatus status, final DateFormat df, final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) { final JsonObjectBuilder builder = factory.createObjectBuilder(); @@ -278,7 +278,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa } } - private void serializeRemoteProcessGroupStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, + void serializeRemoteProcessGroupStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final RemoteProcessGroupStatus status, final DateFormat df, final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) { final JsonObjectBuilder builder = factory.createObjectBuilder(); @@ -303,7 +303,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa } } - private void serializePortStatus(final String componentType, final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final PortStatus status, + void serializePortStatus(final String componentType, final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final PortStatus status, final DateFormat df, final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) { final JsonObjectBuilder builder = factory.createObjectBuilder(); final String componentName = status.getName(); @@ -327,7 +327,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa } } - private void serializeConnectionStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final ConnectionStatus status, final DateFormat df, + void serializeConnectionStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final ConnectionStatus status, final DateFormat df, final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) { final JsonObjectBuilder builder = factory.createObjectBuilder(); final String componentType = "Connection"; @@ -355,7 +355,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa } } - private void serializeProcessorStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final ProcessorStatus status, final DateFormat df, + void serializeProcessorStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final ProcessorStatus status, final DateFormat df, final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) { final JsonObjectBuilder builder = factory.createObjectBuilder(); final String componentType = "Processor"; @@ -386,7 +386,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa } } - private void addCommonFields(final JsonObjectBuilder builder, final DateFormat df, final String hostname, + private static void addCommonFields(final JsonObjectBuilder builder, final DateFormat df, final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate, final String componentType, final String componentName) { addField(builder, "statusId", UUID.randomUUID().toString()); @@ -400,4 +400,23 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa addField(builder, "application", applicationName); } + private static void addField(final JsonObjectBuilder builder, final String key, final Long value) { + if (value != null) { + builder.add(key, value.longValue()); + } + } + + private static void addField(final JsonObjectBuilder builder, final String key, final Integer value) { + if (value != null) { + builder.add(key, value.intValue()); + } + } + + private static void addField(final JsonObjectBuilder builder, final String key, final String value) { + if (value == null) { + return; + } + + builder.add(key, value); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/e8a8c19d/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask index 652b581..0aced94 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask @@ -15,5 +15,4 @@ org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask org.apache.nifi.reporting.SiteToSiteBulletinReportingTask -org.apache.nifi.reporting.SiteToSiteStatusReportingTask -org.apache.nifi.reporting.SiteToSiteMetricsReportingTask \ No newline at end of file +org.apache.nifi.reporting.SiteToSiteStatusReportingTask \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/e8a8c19d/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteMetricsReportingTask/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteMetricsReportingTask/additionalDetails.html b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteMetricsReportingTask/additionalDetails.html deleted file mode 100644 index 8120d6a..0000000 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteMetricsReportingTask/additionalDetails.html +++ /dev/null @@ -1,178 +0,0 @@ -<!DOCTYPE html> -<html lang="en"> - <!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> - <head> - <meta charset="utf-8" /> - <title>SiteToSiteMetricsReportingTask</title> - - <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" /> - </head> - - <body> - <p> - The Site-to-Site Metrics Reporting Task allows the user to publish NiFi's metrics (as in the Ambari reporting task) to the - same NiFi instance or another NiFi instance. This provides a great deal of power because it allows the user to make use of - all of the different Processors that are available in NiFi in order to process or distribute that data. - </p> - - <h2>Ambari format</h2> - - <p> - There are two available output formats. The first one is the Ambari format as defined in the Ambari Metrics Collector - API which is a JSON with dynamic keys. If using this format you might be interested by the below Jolt specification to - transform the data. - </p> - - <pre> - <code> - [ - { - "operation": "shift", - "spec": { - "metrics": { - "*": { - "metrics": { - "*": { - "$": "metrics.[#4].metrics.time", - "@": "metrics.[#4].metrics.value" - } - }, - "*": "metrics.[&1].&" - } - } - } - } - ] - </code> - </pre> - - <p> - This would transform the below sample: - </p> - - <pre> - <code> - { - "metrics": [{ - "metricname": "jvm.gc.time.G1OldGeneration", - "appid": "nifi", - "instanceid": "8927f4c0-0160-1000-597a-ea764ccd81a7", - "hostname": "localhost", - "timestamp": "1520456854361", - "starttime": "1520456854361", - "metrics": { - "1520456854361": "0" - } - }, { - "metricname": "jvm.thread_states.terminated", - "appid": "nifi", - "instanceid": "8927f4c0-0160-1000-597a-ea764ccd81a7", - "hostname": "localhost", - "timestamp": "1520456854361", - "starttime": "1520456854361", - "metrics": { - "1520456854361": "0" - } - }] - } - </code> - </pre> - - <p> - into: - </p> - - <pre> - <code> - { - "metrics": [{ - "metricname": "jvm.gc.time.G1OldGeneration", - "appid": "nifi", - "instanceid": "8927f4c0-0160-1000-597a-ea764ccd81a7", - "hostname": "localhost", - "timestamp": "1520456854361", - "starttime": "1520456854361", - "metrics": { - "time": "1520456854361", - "value": "0" - } - }, { - "metricname": "jvm.thread_states.terminated", - "appid": "nifi", - "instanceid": "8927f4c0-0160-1000-597a-ea764ccd81a7", - "hostname": "localhost", - "timestamp": "1520456854361", - "starttime": "1520456854361", - "metrics": { - "time": "1520456854361", - "value": "0" - } - }] - } - </code> - </pre> - - <h2>Record format</h2> - - <p> - The second format is leveraging the record framework of NiFi so that the user can define a Record Writer and directly - specify the output format and data with the assumption that the input schema is the following: - </p> - - <pre> - <code> - { - "type" : "record", - "name" : "metrics", - "namespace" : "metrics", - "fields" : [ - { "name" : "appid", "type" : "string" }, - { "name" : "instanceid", "type" : "string" }, - { "name" : "hostname", "type" : "string" }, - { "name" : "timestamp", "type" : "long" }, - { "name" : "loadAverage1min", "type" : "double" }, - { "name" : "availableCores", "type" : "int" }, - { "name" : "FlowFilesReceivedLast5Minutes", "type" : "int" }, - { "name" : "BytesReceivedLast5Minutes", "type" : "long" }, - { "name" : "FlowFilesSentLast5Minutes", "type" : "int" }, - { "name" : "BytesSentLast5Minutes", "type" : "long" }, - { "name" : "FlowFilesQueued", "type" : "int" }, - { "name" : "BytesQueued", "type" : "long" }, - { "name" : "BytesReadLast5Minutes", "type" : "long" }, - { "name" : "BytesWrittenLast5Minutes", "type" : "long" }, - { "name" : "ActiveThreads", "type" : "int" }, - { "name" : "TotalTaskDurationSeconds", "type" : "long" }, - { "name" : "TotalTaskDurationNanoSeconds", "type" : "long" }, - { "name" : "jvmuptime", "type" : "long" }, - { "name" : "jvmheap_used", "type" : "double" }, - { "name" : "jvmheap_usage", "type" : "double" }, - { "name" : "jvmnon_heap_usage", "type" : "double" }, - { "name" : "jvmthread_statesrunnable", "type" : ["int", "null"] }, - { "name" : "jvmthread_statesblocked", "type" : ["int", "null"] }, - { "name" : "jvmthread_statestimed_waiting", "type" : ["int", "null"] }, - { "name" : "jvmthread_statesterminated", "type" : ["int", "null"] }, - { "name" : "jvmthread_count", "type" : "int" }, - { "name" : "jvmdaemon_thread_count", "type" : "int" }, - { "name" : "jvmfile_descriptor_usage", "type" : "double" }, - { "name" : "jvmgcruns", "type" : ["long", "null"] }, - { "name" : "jvmgctime", "type" : ["long", "null"] } - ] - } - </code> - </pre> - - </body> -</html> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/e8a8c19d/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html index 86736a6..e1841b2 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html @@ -25,7 +25,7 @@ <p> The Site-to-Site Provenance Reporting Task allows the user to publish all of the Provenance Events from a NiFi instance back to the same NiFi instance or another NiFi instance. This provides a great deal of power because it allows the user to make use of - all of the different Processors that are available in NiFi in order to process or distribute that data. When possible, it is + all of the different Processors that are available in NiFi in order to processor or distribute that data. When possible, it is advisable to send the Provenance data to a different NiFi instance than the one that this Reporting Task is running on, because when the data is received over Site-to-Site and processed, that in and of itself will generate Provenance events. As a result, there is a cycle that is created. However, the data is sent in batches (1,000 by default). This means that for each batch of Provenance events http://git-wip-us.apache.org/repos/asf/nifi/blob/e8a8c19d/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-metrics.avsc ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-metrics.avsc b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-metrics.avsc deleted file mode 100644 index 90dea10..0000000 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-metrics.avsc +++ /dev/null @@ -1,37 +0,0 @@ -{ - "type" : "record", - "name" : "metrics", - "namespace" : "metrics", - "fields" : [ - { "name" : "appid", "type" : "string" }, - { "name" : "instanceid", "type" : "string" }, - { "name" : "hostname", "type" : "string" }, - { "name" : "timestamp", "type" : "long" }, - { "name" : "loadAverage1min", "type" : "double" }, - { "name" : "availableCores", "type" : "int" }, - { "name" : "FlowFilesReceivedLast5Minutes", "type" : "int" }, - { "name" : "BytesReceivedLast5Minutes", "type" : "long" }, - { "name" : "FlowFilesSentLast5Minutes", "type" : "int" }, - { "name" : "BytesSentLast5Minutes", "type" : "long" }, - { "name" : "FlowFilesQueued", "type" : "int" }, - { "name" : "BytesQueued", "type" : "long" }, - { "name" : "BytesReadLast5Minutes", "type" : "long" }, - { "name" : "BytesWrittenLast5Minutes", "type" : "long" }, - { "name" : "ActiveThreads", "type" : "int" }, - { "name" : "TotalTaskDurationSeconds", "type" : "long" }, - { "name" : "TotalTaskDurationNanoSeconds", "type" : "long" }, - { "name" : "jvmuptime", "type" : "long" }, - { "name" : "jvmheap_used", "type" : "double" }, - { "name" : "jvmheap_usage", "type" : "double" }, - { "name" : "jvmnon_heap_usage", "type" : "double" }, - { "name" : "jvmthread_statesrunnable", "type" : ["int", "null"] }, - { "name" : "jvmthread_statesblocked", "type" : ["int", "null"] }, - { "name" : "jvmthread_statestimed_waiting", "type" : ["int", "null"] }, - { "name" : "jvmthread_statesterminated", "type" : ["int", "null"] }, - { "name" : "jvmthread_count", "type" : "int" }, - { "name" : "jvmdaemon_thread_count", "type" : "int" }, - { "name" : "jvmfile_descriptor_usage", "type" : "double" }, - { "name" : "jvmgcruns", "type" : ["long", "null"] }, - { "name" : "jvmgctime", "type" : ["long", "null"] } - ] -} http://git-wip-us.apache.org/repos/asf/nifi/blob/e8a8c19d/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java deleted file mode 100644 index c699a1c..0000000 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java +++ /dev/null @@ -1,296 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.reporting; - - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.UUID; - -import javax.json.Json; -import javax.json.JsonArray; -import javax.json.JsonObject; -import javax.json.JsonReader; -import javax.json.JsonValue; - -import org.apache.nifi.attribute.expression.language.StandardPropertyValue; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.PropertyValue; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.controller.status.ProcessGroupStatus; -import org.apache.nifi.controller.status.ProcessorStatus; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.remote.Transaction; -import org.apache.nifi.remote.TransferDirection; -import org.apache.nifi.remote.client.SiteToSiteClient; -import org.apache.nifi.serialization.RecordSetWriterFactory; -import org.apache.nifi.serialization.record.MockRecordWriter; -import org.apache.nifi.state.MockStateManager; -import org.apache.nifi.util.MockPropertyValue; -import org.apache.nifi.util.TestRunner; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -public class TestSiteToSiteMetricsReportingTask { - - private ReportingContext context; - private ProcessGroupStatus status; - private TestRunner runner; - - @Before - public void setup() { - status = new ProcessGroupStatus(); - status.setId("1234"); - status.setFlowFilesReceived(5); - status.setBytesReceived(10000); - status.setFlowFilesSent(10); - status.setBytesSent(20000); - status.setQueuedCount(100); - status.setQueuedContentSize(1024L); - status.setBytesRead(60000L); - status.setBytesWritten(80000L); - status.setActiveThreadCount(5); - - // create a processor status with processing time - ProcessorStatus procStatus = new ProcessorStatus(); - procStatus.setProcessingNanos(123456789); - - Collection<ProcessorStatus> processorStatuses = new ArrayList<>(); - processorStatuses.add(procStatus); - status.setProcessorStatus(processorStatuses); - - // create a group status with processing time - ProcessGroupStatus groupStatus = new ProcessGroupStatus(); - groupStatus.setProcessorStatus(processorStatuses); - - Collection<ProcessGroupStatus> groupStatuses = new ArrayList<>(); - groupStatuses.add(groupStatus); - status.setProcessGroupStatus(groupStatuses); - } - - public MockSiteToSiteMetricsReportingTask initTask(Map<PropertyDescriptor, String> customProperties) throws InitializationException, IOException { - - final MockSiteToSiteMetricsReportingTask task = new MockSiteToSiteMetricsReportingTask(); - Map<PropertyDescriptor, String> properties = new HashMap<>(); - for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) { - properties.put(descriptor, descriptor.getDefaultValue()); - } - properties.putAll(customProperties); - - context = Mockito.mock(ReportingContext.class); - Mockito.when(context.getStateManager()).thenReturn(new MockStateManager(task)); - Mockito.doAnswer(new Answer<PropertyValue>() { - @Override - public PropertyValue answer(final InvocationOnMock invocation) throws Throwable { - final PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor.class); - return new MockPropertyValue(properties.get(descriptor)); - } - }).when(context).getProperty(Mockito.any(PropertyDescriptor.class)); - - final EventAccess eventAccess = Mockito.mock(EventAccess.class); - Mockito.when(context.getEventAccess()).thenReturn(eventAccess); - Mockito.when(eventAccess.getControllerStatus()).thenReturn(status); - - final PropertyValue pValue = Mockito.mock(StandardPropertyValue.class); - MockRecordWriter writer = new MockRecordWriter(); - Mockito.when(context.getProperty(MockSiteToSiteMetricsReportingTask.RECORD_WRITER)).thenReturn(pValue); - Mockito.when(pValue.asControllerService(RecordSetWriterFactory.class)).thenReturn(writer); - - final ComponentLog logger = Mockito.mock(ComponentLog.class); - final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class); - Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString()); - Mockito.when(initContext.getLogger()).thenReturn(logger); - task.initialize(initContext); - - return task; - } - - @Test - public void testValidationBothAmbariFormatRecordWriter() throws IOException { - ValidationContext validationContext = Mockito.mock(ValidationContext.class); - final String urlEL = "http://${hostname(true)}:8080/nifi"; - final String url = "http://localhost:8080/nifi"; - - final MockSiteToSiteMetricsReportingTask task = new MockSiteToSiteMetricsReportingTask(); - Map<PropertyDescriptor, String> properties = new HashMap<>(); - for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) { - properties.put(descriptor, descriptor.getDefaultValue()); - } - - properties.put(SiteToSiteMetricsReportingTask.FORMAT, SiteToSiteMetricsReportingTask.AMBARI_FORMAT.getValue()); - properties.put(SiteToSiteMetricsReportingTask.DESTINATION_URL, url); - properties.put(SiteToSiteMetricsReportingTask.INSTANCE_URL, url); - properties.put(SiteToSiteMetricsReportingTask.PORT_NAME, "port"); - - final PropertyValue pValueUrl = Mockito.mock(StandardPropertyValue.class); - Mockito.when(validationContext.newPropertyValue(url)).thenReturn(pValueUrl); - Mockito.when(validationContext.newPropertyValue(urlEL)).thenReturn(pValueUrl); - Mockito.when(pValueUrl.evaluateAttributeExpressions()).thenReturn(pValueUrl); - Mockito.when(pValueUrl.getValue()).thenReturn(url); - - Mockito.doAnswer(new Answer<PropertyValue>() { - @Override - public PropertyValue answer(final InvocationOnMock invocation) throws Throwable { - final PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor.class); - return new MockPropertyValue(properties.get(descriptor)); - } - }).when(validationContext).getProperty(Mockito.any(PropertyDescriptor.class)); - - final PropertyValue pValue = Mockito.mock(StandardPropertyValue.class); - Mockito.when(validationContext.getProperty(MockSiteToSiteMetricsReportingTask.RECORD_WRITER)).thenReturn(pValue); - Mockito.when(pValue.isSet()).thenReturn(true); - - // should be invalid because both ambari format and record writer are set - Collection<ValidationResult> list = task.validate(validationContext); - Assert.assertEquals(1, list.size()); - Assert.assertEquals(SiteToSiteMetricsReportingTask.RECORD_WRITER.getDisplayName(), list.iterator().next().getInput()); - } - - @Test - public void testValidationRecordFormatNoRecordWriter() throws IOException { - ValidationContext validationContext = Mockito.mock(ValidationContext.class); - final String urlEL = "http://${hostname(true)}:8080/nifi"; - final String url = "http://localhost:8080/nifi"; - - final MockSiteToSiteMetricsReportingTask task = new MockSiteToSiteMetricsReportingTask(); - Map<PropertyDescriptor, String> properties = new HashMap<>(); - for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) { - properties.put(descriptor, descriptor.getDefaultValue()); - } - - properties.put(SiteToSiteMetricsReportingTask.FORMAT, SiteToSiteMetricsReportingTask.RECORD_FORMAT.getValue()); - properties.put(SiteToSiteMetricsReportingTask.DESTINATION_URL, url); - properties.put(SiteToSiteMetricsReportingTask.INSTANCE_URL, url); - properties.put(SiteToSiteMetricsReportingTask.PORT_NAME, "port"); - - final PropertyValue pValueUrl = Mockito.mock(StandardPropertyValue.class); - Mockito.when(validationContext.newPropertyValue(url)).thenReturn(pValueUrl); - Mockito.when(validationContext.newPropertyValue(urlEL)).thenReturn(pValueUrl); - Mockito.when(pValueUrl.evaluateAttributeExpressions()).thenReturn(pValueUrl); - Mockito.when(pValueUrl.getValue()).thenReturn(url); - - Mockito.doAnswer(new Answer<PropertyValue>() { - @Override - public PropertyValue answer(final InvocationOnMock invocation) throws Throwable { - final PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor.class); - return new MockPropertyValue(properties.get(descriptor)); - } - }).when(validationContext).getProperty(Mockito.any(PropertyDescriptor.class)); - - final PropertyValue pValue = Mockito.mock(StandardPropertyValue.class); - Mockito.when(validationContext.getProperty(MockSiteToSiteMetricsReportingTask.RECORD_WRITER)).thenReturn(pValue); - Mockito.when(pValue.isSet()).thenReturn(false); - - // should be invalid because both ambari format and record writer are set - Collection<ValidationResult> list = task.validate(validationContext); - Assert.assertEquals(1, list.size()); - Assert.assertEquals(SiteToSiteMetricsReportingTask.RECORD_WRITER.getDisplayName(), list.iterator().next().getInput()); - } - - @Test - public void testAmbariFormat() throws IOException, InitializationException { - - final Map<PropertyDescriptor, String> properties = new HashMap<>(); - properties.put(SiteToSiteMetricsReportingTask.FORMAT, SiteToSiteMetricsReportingTask.AMBARI_FORMAT.getValue()); - - MockSiteToSiteMetricsReportingTask task = initTask(properties); - task.onTrigger(context); - - assertEquals(1, task.dataSent.size()); - final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8); - JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes())); - JsonArray array = jsonReader.readObject().getJsonArray("metrics"); - for(int i = 0; i < array.size(); i++) { - JsonObject object = array.getJsonObject(i); - assertEquals("nifi", object.getString("appid")); - assertEquals("1234", object.getString("instanceid")); - if(object.getString("metricname").equals("FlowFilesQueued")) { - for(Entry<String, JsonValue> kv : object.getJsonObject("metrics").entrySet()) { - assertEquals("\"100\"", kv.getValue().toString()); - } - return; - } - } - fail(); - } - - @Test - public void testRecordFormat() throws IOException, InitializationException { - final Map<PropertyDescriptor, String> properties = new HashMap<>(); - properties.put(SiteToSiteMetricsReportingTask.FORMAT, SiteToSiteMetricsReportingTask.RECORD_FORMAT.getValue()); - properties.put(SiteToSiteMetricsReportingTask.RECORD_WRITER, "record-writer"); - MockSiteToSiteMetricsReportingTask task = initTask(properties); - - task.onTrigger(context); - - assertEquals(1, task.dataSent.size()); - String[] data = new String(task.dataSent.get(0)).split(","); - assertEquals("\"nifi\"", data[0]); - assertEquals("\"1234\"", data[1]); - assertEquals("\"100\"", data[10]); // FlowFilesQueued - } - - private static final class MockSiteToSiteMetricsReportingTask extends SiteToSiteMetricsReportingTask { - - public MockSiteToSiteMetricsReportingTask() throws IOException { - super(); - } - - final List<byte[]> dataSent = new ArrayList<>(); - - @Override - protected SiteToSiteClient getClient() { - final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class); - final Transaction transaction = Mockito.mock(Transaction.class); - - try { - Mockito.doAnswer(new Answer<Object>() { - @Override - public Object answer(final InvocationOnMock invocation) throws Throwable { - final byte[] data = invocation.getArgumentAt(0, byte[].class); - dataSent.add(data); - return null; - } - }).when(transaction).send(Mockito.any(byte[].class), Mockito.any(Map.class)); - - Mockito.when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction); - } catch (final Exception e) { - e.printStackTrace(); - Assert.fail(e.toString()); - } - - return client; - } - } - -}
