Repository: nifi Updated Branches: refs/heads/master 7df5c2dc8 -> de67e5f7d
NIFI-3674: Implementing SiteToSiteStatusReportingTask Signed-off-by: Bryan Bende <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ea6320d6 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ea6320d6 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ea6320d6 Branch: refs/heads/master Commit: ea6320d621398713c6aec98e0d79509fd8586e17 Parents: 7df5c2d Author: Joe Gresock <[email protected]> Authored: Fri Apr 7 15:45:40 2017 +0000 Committer: Bryan Bende <[email protected]> Committed: Mon May 1 09:55:10 2017 -0400 ---------------------------------------------------------------------- .../SiteToSiteStatusReportingTask.java | 415 +++++++++++++++++++ .../org.apache.nifi.reporting.ReportingTask | 3 +- .../TestSiteToSiteStatusReportingTask.java | 346 ++++++++++++++++ 3 files changed, 763 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/ea6320d6/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java new file mode 100644 index 0000000..d94acc9 --- /dev/null +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.reporting; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + +import javax.json.Json; +import javax.json.JsonArray; +import javax.json.JsonArrayBuilder; +import javax.json.JsonBuilderFactory; +import javax.json.JsonObjectBuilder; +import javax.json.JsonValue; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.controller.status.PortStatus; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.RemoteProcessGroupStatus; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; + +@Tags({"status", "metrics", "history", "site", "site to site"}) +@CapabilityDescription("Publishes Status events using the Site To Site protocol. " + + "The component type and name filter regexes form a union: only components matching both regexes will be reported. " + + "However, all process groups are recursively searched for matching components, regardless of whether the process group matches the component filters.") +public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTask { + + static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; + + static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder() + .name("Platform") + .description("The value to use for the platform field in each provenance event.") + .required(true) + .expressionLanguageSupported(true) + .defaultValue("nifi") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + static final PropertyDescriptor COMPONENT_TYPE_FILTER_REGEX = new PropertyDescriptor.Builder() + .name("Component Type Filter Regex") + .description("A regex specifying which component types to report. Any component type matching this regex will be included. " + + "Component types are: Processor, RootProcessGroup, ProcessGroup, RemoteProcessGroup, Connection, InputPort, OutputPort") + .required(true) + .expressionLanguageSupported(true) + .defaultValue("(Processor|ProcessGroup|RemoteProcessGroup|RootProcessGroup|Connection|InputPort|OutputPort)") + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .build(); + static final PropertyDescriptor COMPONENT_NAME_FILTER_REGEX = new PropertyDescriptor.Builder() + .name("Component Name Filter Regex") + .description("A regex specifying which component names to report. Any component name matching this regex will be included.") + .required(true) + .expressionLanguageSupported(true) + .defaultValue(".*") + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .build(); + + private volatile Pattern componentTypeFilter; + private volatile Pattern componentNameFilter; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.add(PLATFORM); + properties.add(COMPONENT_TYPE_FILTER_REGEX); + properties.add(COMPONENT_NAME_FILTER_REGEX); + return properties; + } + + @Override + public void onTrigger(final ReportingContext context) { + final boolean isClustered = context.isClustered(); + final String nodeId = context.getClusterNodeIdentifier(); + if (nodeId == null && isClustered) { + getLogger().debug("This instance of NiFi is configured for clustering, but the Cluster Node Identifier is not yet available. " + + "Will wait for Node Identifier to be established."); + return; + } + + componentTypeFilter = Pattern.compile(context.getProperty(COMPONENT_TYPE_FILTER_REGEX).getValue()); + componentNameFilter = Pattern.compile(context.getProperty(COMPONENT_NAME_FILTER_REGEX).getValue()); + + final ProcessGroupStatus procGroupStatus = context.getEventAccess().getControllerStatus(); + final String rootGroupName = procGroupStatus == null ? null : procGroupStatus.getName(); + + final String nifiUrl = context.getProperty(INSTANCE_URL).evaluateAttributeExpressions().getValue(); + URL url; + try { + url = new URL(nifiUrl); + } catch (final MalformedURLException e1) { + // already validated + throw new AssertionError(); + } + + final String hostname = url.getHost(); + final String platform = context.getProperty(PLATFORM).evaluateAttributeExpressions().getValue(); + + final Map<String, ?> config = Collections.emptyMap(); + final JsonBuilderFactory factory = Json.createBuilderFactory(config); + + final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT); + df.setTimeZone(TimeZone.getTimeZone("Z")); + + final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder(); + serializeProcessGroupStatus(arrayBuilder, factory, procGroupStatus, df, hostname, rootGroupName, + platform, null, new Date()); + + final JsonArray jsonArray = arrayBuilder.build(); + + final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + int fromIndex = 0; + int toIndex = Math.min(batchSize, jsonArray.size()); + List<JsonValue> jsonBatch = jsonArray.subList(fromIndex, toIndex); + + while(!jsonBatch.isEmpty()) { + // Send the JSON document for the current batch + try { + long start = System.nanoTime(); + final Transaction transaction = getClient().createTransaction(TransferDirection.SEND); + if (transaction == null) { + getLogger().debug("All destination nodes are penalized; will attempt to send data later"); + return; + } + + final Map<String, String> attributes = new HashMap<>(); + final String transactionId = UUID.randomUUID().toString(); + attributes.put("reporting.task.transaction.id", transactionId); + attributes.put("mime.type", "application/json"); + + JsonArrayBuilder jsonBatchArrayBuilder = factory.createArrayBuilder(); + for(JsonValue jsonValue : jsonBatch) { + jsonBatchArrayBuilder.add(jsonValue); + } + final JsonArray jsonBatchArray = jsonBatchArrayBuilder.build(); + + final byte[] data = jsonBatchArray.toString().getBytes(StandardCharsets.UTF_8); + transaction.send(data, attributes); + transaction.confirm(); + transaction.complete(); + + final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + getLogger().info("Successfully sent {} Status Records to destination in {} ms; Transaction ID = {}", + new Object[]{jsonArray.size(), transferMillis, transactionId}); + + fromIndex = toIndex; + toIndex = Math.min(fromIndex + batchSize, jsonArray.size()); + jsonBatch = jsonArray.subList(fromIndex, toIndex); + } catch (final IOException e) { + throw new ProcessException("Failed to send Provenance Events to destination due to IOException:" + e.getMessage(), e); + } + } + } + + /** + * Returns true only if the component type matches the component type filter + * and the component name matches the component name filter. + * + * @param componentType + * The component type + * @param componentName + * The component name + * @return Whether the component matches both filters + */ + boolean componentMatchesFilters(final String componentType, final String componentName) { + return componentTypeFilter.matcher(componentType).matches() + && componentNameFilter.matcher(componentName).matches(); + } + + /** + * Serialize the ProcessGroupStatus and add it to the JsonArrayBuilder. + * @param arrayBuilder + * The JSON Array builder + * @param factory + * The JSON Builder Factory + * @param status + * The ProcessGroupStatus + * @param df + * A date format + * @param hostname + * The current hostname + * @param applicationName + * The root process group name + * @param platform + * The configured platform + * @param parentId + * The parent's component id + */ + void serializeProcessGroupStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, + final ProcessGroupStatus status, final DateFormat df, + final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) { + final JsonObjectBuilder builder = factory.createObjectBuilder(); + final String componentType = (parentId == null) ? "RootProcessGroup" : "ProcessGroup"; + final String componentName = status.getName(); + + if (componentMatchesFilters(componentType, componentName)) { + addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate, + componentType, componentName); + + addField(builder, "componentId", status.getId()); + addField(builder, "bytesRead", status.getBytesRead()); + addField(builder, "bytesWritten", status.getBytesWritten()); + addField(builder, "bytesReceived", status.getBytesReceived()); + addField(builder, "bytesSent", status.getBytesSent()); + addField(builder, "bytesTransferred", status.getBytesTransferred()); + addField(builder, "flowFilesReceived", status.getFlowFilesReceived()); + addField(builder, "flowFilesSent", status.getFlowFilesSent()); + addField(builder, "flowFilesTransferred", status.getFlowFilesTransferred()); + addField(builder, "inputContentSize", status.getInputContentSize()); + addField(builder, "inputCount", status.getInputCount()); + addField(builder, "outputContentSize", status.getOutputContentSize()); + addField(builder, "outputCount", status.getOutputCount()); + addField(builder, "queuedContentSize", status.getQueuedContentSize()); + addField(builder, "activeThreadCount", status.getActiveThreadCount()); + addField(builder, "queuedCount", status.getQueuedCount()); + + arrayBuilder.add(builder.build()); + } + + for(ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) { + serializeProcessGroupStatus(arrayBuilder, factory, childGroupStatus, df, hostname, + applicationName, platform, status.getId(), currentDate); + } + for(ProcessorStatus processorStatus : status.getProcessorStatus()) { + serializeProcessorStatus(arrayBuilder, factory, processorStatus, df, hostname, + applicationName, platform, status.getId(), currentDate); + } + for(ConnectionStatus connectionStatus : status.getConnectionStatus()) { + serializeConnectionStatus(arrayBuilder, factory, connectionStatus, df, hostname, + applicationName, platform, status.getId(), currentDate); + } + for(PortStatus portStatus : status.getInputPortStatus()) { + serializePortStatus("InputPort", arrayBuilder, factory, portStatus, df, + hostname, applicationName, platform, status.getId(), currentDate); + } + for(PortStatus portStatus : status.getOutputPortStatus()) { + serializePortStatus("OutputPort", arrayBuilder, factory, portStatus, df, + hostname, applicationName, platform, status.getId(), currentDate); + } + for(RemoteProcessGroupStatus remoteProcessGroupStatus : status.getRemoteProcessGroupStatus()) { + serializeRemoteProcessGroupStatus(arrayBuilder, factory, remoteProcessGroupStatus, df, hostname, + applicationName, platform, status.getId(), currentDate); + } + } + + void serializeRemoteProcessGroupStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, + final RemoteProcessGroupStatus status, final DateFormat df, final String hostname, final String applicationName, + final String platform, final String parentId, final Date currentDate) { + final JsonObjectBuilder builder = factory.createObjectBuilder(); + final String componentType = "RemoteProcessGroup"; + final String componentName = status.getName(); + + if (componentMatchesFilters(componentType, componentName)) { + addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate, + componentType, componentName); + + addField(builder, "componentId", status.getId()); + addField(builder, "activeRemotePortCount", status.getActiveRemotePortCount()); + addField(builder, "activeThreadCount", status.getActiveThreadCount()); + addField(builder, "inactiveRemotePortCount", status.getInactiveRemotePortCount()); + addField(builder, "receivedContentSize", status.getReceivedContentSize()); + addField(builder, "receivedCount", status.getReceivedCount()); + addField(builder, "sentContentSize", status.getSentContentSize()); + addField(builder, "sentCount", status.getSentCount()); + addField(builder, "averageLineageDuration", status.getAverageLineageDuration()); + + arrayBuilder.add(builder.build()); + } + } + + void serializePortStatus(final String componentType, final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final PortStatus status, + final DateFormat df, final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) { + final JsonObjectBuilder builder = factory.createObjectBuilder(); + final String componentName = status.getName(); + + if (componentMatchesFilters(componentType, componentName)) { + addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate, + componentType, componentName); + + addField(builder, "componentId", status.getId()); + addField(builder, "activeThreadCount", status.getActiveThreadCount()); + addField(builder, "bytesReceived", status.getBytesReceived()); + addField(builder, "bytesSent", status.getBytesSent()); + addField(builder, "flowFilesReceived", status.getFlowFilesReceived()); + addField(builder, "flowFilesSent", status.getFlowFilesSent()); + addField(builder, "inputBytes", status.getInputBytes()); + addField(builder, "inputCount", status.getInputCount()); + addField(builder, "outputBytes", status.getOutputBytes()); + addField(builder, "outputCount", status.getOutputCount()); + + arrayBuilder.add(builder.build()); + } + } + + void serializeConnectionStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final ConnectionStatus status, final DateFormat df, + final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) { + final JsonObjectBuilder builder = factory.createObjectBuilder(); + final String componentType = "Connection"; + final String componentName = status.getName(); + + if (componentMatchesFilters(componentType, componentName)) { + addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate, + componentType, componentName); + + addField(builder, "componentId", status.getId()); + addField(builder, "maxQueuedBytes", status.getMaxQueuedBytes()); + addField(builder, "maxQueuedCount", status.getMaxQueuedCount()); + addField(builder, "queuedBytes", status.getQueuedBytes()); + addField(builder, "queuedCount", status.getQueuedCount()); + addField(builder, "inputBytes", status.getInputBytes()); + addField(builder, "inputCount", status.getInputCount()); + addField(builder, "outputBytes", status.getOutputBytes()); + addField(builder, "outputCount", status.getOutputCount()); + + arrayBuilder.add(builder.build()); + } + } + + void serializeProcessorStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final ProcessorStatus status, final DateFormat df, + final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) { + final JsonObjectBuilder builder = factory.createObjectBuilder(); + final String componentType = "Processor"; + final String componentName = status.getName(); + + if (componentMatchesFilters(componentType, componentName)) { + addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate, componentType, componentName); + + addField(builder, "componentId", status.getId()); + addField(builder, "processorType", status.getType()); + addField(builder, "averageLineageDurationMS", status.getAverageLineageDuration()); + addField(builder, "bytesRead", status.getBytesRead()); + addField(builder, "bytesWritten", status.getBytesWritten()); + addField(builder, "bytesReceived", status.getBytesReceived()); + addField(builder, "bytesSent", status.getBytesSent()); + addField(builder, "flowFilesRemoved", status.getFlowFilesRemoved()); + addField(builder, "flowFilesReceived", status.getFlowFilesReceived()); + addField(builder, "flowFilesSent", status.getFlowFilesSent()); + addField(builder, "inputCount", status.getInputCount()); + addField(builder, "inputBytes", status.getInputBytes()); + addField(builder, "outputCount", status.getOutputCount()); + addField(builder, "outputBytes", status.getOutputBytes()); + addField(builder, "activeThreadCount", status.getActiveThreadCount()); + addField(builder, "invocations", status.getInvocations()); + addField(builder, "processingNanos", status.getProcessingNanos()); + + arrayBuilder.add(builder.build()); + } + } + + private static void addCommonFields(final JsonObjectBuilder builder, final DateFormat df, final String hostname, + final String applicationName, final String platform, final String parentId, final Date currentDate, + final String componentType, final String componentName) { + addField(builder, "statusId", UUID.randomUUID().toString()); + addField(builder, "timestampMillis", currentDate.getTime()); + addField(builder, "timestamp", df.format(currentDate)); + addField(builder, "actorHostname", hostname); + addField(builder, "componentType", componentType); + addField(builder, "componentName", componentName); + addField(builder, "parentId", parentId); + addField(builder, "platform", platform); + addField(builder, "application", applicationName); + } + + private static void addField(final JsonObjectBuilder builder, final String key, final Long value) { + if (value != null) { + builder.add(key, value.longValue()); + } + } + + private static void addField(final JsonObjectBuilder builder, final String key, final Integer value) { + if (value != null) { + builder.add(key, value.intValue()); + } + } + + private static void addField(final JsonObjectBuilder builder, final String key, final String value) { + if (value == null) { + return; + } + + builder.add(key, value); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/ea6320d6/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask index bdf61cc..0aced94 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask @@ -14,4 +14,5 @@ # limitations under the License. org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask -org.apache.nifi.reporting.SiteToSiteBulletinReportingTask \ No newline at end of file +org.apache.nifi.reporting.SiteToSiteBulletinReportingTask +org.apache.nifi.reporting.SiteToSiteStatusReportingTask \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/ea6320d6/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java new file mode 100644 index 0000000..3c737d1 --- /dev/null +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.reporting; + + +import static org.junit.Assert.assertEquals; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import javax.json.Json; +import javax.json.JsonReader; +import javax.json.JsonString; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.controller.status.PortStatus; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.RemoteProcessGroupStatus; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.state.MockStateManager; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.MockPropertyValue; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class TestSiteToSiteStatusReportingTask { + private ReportingContext context; + + public MockSiteToSiteStatusReportingTask initTask(Map<PropertyDescriptor, String> customProperties, + ProcessGroupStatus pgStatus) throws InitializationException { + final MockSiteToSiteStatusReportingTask task = new MockSiteToSiteStatusReportingTask(); + Map<PropertyDescriptor, String> properties = new HashMap<>(); + for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) { + properties.put(descriptor, descriptor.getDefaultValue()); + } + properties.putAll(customProperties); + + context = Mockito.mock(ReportingContext.class); + Mockito.when(context.getStateManager()) + .thenReturn(new MockStateManager(task)); + Mockito.doAnswer(new Answer<PropertyValue>() { + @Override + public PropertyValue answer(final InvocationOnMock invocation) throws Throwable { + final PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor.class); + return new MockPropertyValue(properties.get(descriptor)); + } + }).when(context).getProperty(Mockito.any(PropertyDescriptor.class)); + + final EventAccess eventAccess = Mockito.mock(EventAccess.class); + + Mockito.when(context.getEventAccess()).thenReturn(eventAccess); + Mockito.when(eventAccess.getControllerStatus()).thenReturn(pgStatus); + + final ComponentLog logger = Mockito.mock(ComponentLog.class); + final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class); + Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString()); + Mockito.when(initContext.getLogger()).thenReturn(logger); + task.initialize(initContext); + + return task; + } + + @Test + public void testSerializedForm() throws IOException, InitializationException { + final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0); + + final Map<PropertyDescriptor, String> properties = new HashMap<>(); + properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4"); + properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*"); + properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, ".*"); + + MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus); + task.onTrigger(context); + + assertEquals(16, task.dataSent.size()); + final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8); + JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes())); + JsonString componentId = jsonReader.readArray().getJsonObject(0).getJsonString("componentId"); + assertEquals(pgStatus.getId(), componentId.getString()); + } + + @Test + public void testComponentTypeFilter() throws IOException, InitializationException { + final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0); + + final Map<PropertyDescriptor, String> properties = new HashMap<>(); + properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4"); + properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*"); + properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(ProcessGroup|RootProcessGroup)"); + + MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus); + task.onTrigger(context); + + assertEquals(1, task.dataSent.size()); // Only root pg and 3 child pgs + final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8); + JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes())); + JsonString componentId = jsonReader.readArray().getJsonObject(0).getJsonString("componentId"); + assertEquals(pgStatus.getId(), componentId.getString()); + } + + @Test + public void testComponentNameFilter() throws IOException, InitializationException { + final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0); + + final Map<PropertyDescriptor, String> properties = new HashMap<>(); + properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4"); + properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*processor.*"); + properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, ".*"); + + MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus); + task.onTrigger(context); + + assertEquals(3, task.dataSent.size()); // 3 processors for each of 4 groups + final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8); + JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes())); + JsonString componentId = jsonReader.readArray().getJsonObject(0).getJsonString("componentId"); + assertEquals("root.1.processor.1", componentId.getString()); + } + + @Test + public void testComponentNameFilter_nested() throws IOException, InitializationException { + final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 2, 0); + + final Map<PropertyDescriptor, String> properties = new HashMap<>(); + properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4"); + properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*processor.*"); + properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, ".*"); + + MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus); + task.onTrigger(context); + + assertEquals(10, task.dataSent.size()); // 3 + (3 * 3) + (3 * 3 * 3) = 39, or 10 batches of 4 + final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8); + JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes())); + JsonString componentId = jsonReader.readArray().getJsonObject(0).getJsonString("componentId"); + assertEquals("root.1.1.processor.1", componentId.getString()); + } + + public static ProcessGroupStatus generateProcessGroupStatus(String id, String namePrefix, + int maxRecursion, int currentDepth) { + Collection<ConnectionStatus> cStatus = new ArrayList<>(); + Collection<PortStatus> ipStatus = new ArrayList<>(); + Collection<PortStatus> opStatus = new ArrayList<>(); + Collection<ProcessorStatus> pStatus = new ArrayList<>(); + Collection<RemoteProcessGroupStatus> rpgStatus = new ArrayList<>(); + Collection<ProcessGroupStatus> childPgStatus = new ArrayList<>(); + + if (currentDepth < maxRecursion) { + for(int i = 1; i < 4; i++) { + childPgStatus.add(generateProcessGroupStatus(id + "." + i, namePrefix + "." + i, + maxRecursion, currentDepth + 1)); + } + } + for(int i = 1; i < 4; i++) { + pStatus.add(generateProcessorStatus(id + ".processor." + i, namePrefix + ".processor." + i)); + } + for(int i = 1; i < 4; i++) { + cStatus.add(generateConnectionStatus(id + ".connection." + i, namePrefix + ".connection." + i)); + } + for(int i = 1; i < 4; i++) { + rpgStatus.add(generateRemoteProcessGroupStatus(id + ".rpg." + i, namePrefix + ".rpg." + i)); + } + for(int i = 1; i < 4; i++) { + ipStatus.add(generatePortStatus(id + ".ip." + i, namePrefix + ".ip." + i)); + } + for(int i = 1; i < 4; i++) { + opStatus.add(generatePortStatus(id + ".op." + i, namePrefix + ".op." + i)); + } + + ProcessGroupStatus pgStatus = new ProcessGroupStatus(); + pgStatus.setId(id); + pgStatus.setName(namePrefix + "-" + UUID.randomUUID().toString()); + pgStatus.setInputPortStatus(ipStatus); + pgStatus.setOutputPortStatus(opStatus); + pgStatus.setProcessGroupStatus(childPgStatus); + pgStatus.setRemoteProcessGroupStatus(rpgStatus); + pgStatus.setProcessorStatus(pStatus); + + pgStatus.setActiveThreadCount(1); + pgStatus.setBytesRead(2L); + pgStatus.setBytesReceived(3l); + pgStatus.setBytesSent(4l); + pgStatus.setBytesTransferred(5l); + pgStatus.setBytesWritten(6l); + pgStatus.setConnectionStatus(cStatus); + pgStatus.setFlowFilesReceived(7); + pgStatus.setFlowFilesSent(8); + pgStatus.setFlowFilesTransferred(9); + pgStatus.setInputContentSize(10l); + pgStatus.setInputCount(11); + pgStatus.setOutputContentSize(12l); + pgStatus.setOutputCount(13); + pgStatus.setQueuedContentSize(14l); + pgStatus.setQueuedCount(15); + + return pgStatus; + } + + public static PortStatus generatePortStatus(String id, String namePrefix) { + PortStatus pStatus = new PortStatus(); + pStatus.setId(id); + pStatus.setName(namePrefix + "-" + UUID.randomUUID().toString()); + pStatus.setActiveThreadCount(0); + pStatus.setBytesReceived(1l); + pStatus.setBytesSent(2l); + pStatus.setFlowFilesReceived(3); + pStatus.setFlowFilesSent(4); + pStatus.setInputBytes(5l); + pStatus.setInputCount(6); + pStatus.setOutputBytes(7l); + pStatus.setOutputCount(8); + + return pStatus; + } + + public static ProcessorStatus generateProcessorStatus(String id, String namePrefix) { + ProcessorStatus pStatus = new ProcessorStatus(); + pStatus.setId(id); + pStatus.setName(namePrefix + "-" + UUID.randomUUID().toString()); + pStatus.setActiveThreadCount(0); + pStatus.setAverageLineageDuration(1l); + pStatus.setBytesRead(2l); + pStatus.setBytesReceived(3l); + pStatus.setBytesSent(4l); + pStatus.setBytesWritten(5l); + pStatus.setFlowFilesReceived(6); + pStatus.setFlowFilesRemoved(7); + pStatus.setFlowFilesSent(8); + pStatus.setInputBytes(9l); + pStatus.setInputCount(10); + pStatus.setInvocations(11); + pStatus.setOutputBytes(12l); + pStatus.setOutputCount(13); + pStatus.setProcessingNanos(14l); + pStatus.setType("type"); + + return pStatus; + } + + public static RemoteProcessGroupStatus generateRemoteProcessGroupStatus(String id, String namePrefix) { + RemoteProcessGroupStatus rpgStatus = new RemoteProcessGroupStatus(); + rpgStatus.setId(id); + rpgStatus.setName(namePrefix + "-" + UUID.randomUUID().toString()); + rpgStatus.setActiveRemotePortCount(0); + rpgStatus.setActiveThreadCount(1); + rpgStatus.setAverageLineageDuration(2l); + rpgStatus.setInactiveRemotePortCount(3); + rpgStatus.setReceivedContentSize(4l); + rpgStatus.setReceivedCount(5); + rpgStatus.setSentContentSize(6l); + rpgStatus.setSentCount(7); + rpgStatus.setTargetUri("uri"); + + return rpgStatus; + } + + public static ConnectionStatus generateConnectionStatus(String id, String namePrefix) { + ConnectionStatus cStatus = new ConnectionStatus(); + cStatus.setId(id); + cStatus.setName(namePrefix + "-" + UUID.randomUUID().toString()); + cStatus.setBackPressureBytesThreshold(0l); + cStatus.setBackPressureObjectThreshold(1l); + cStatus.setInputBytes(2l); + cStatus.setInputCount(3); + cStatus.setMaxQueuedBytes(4l); + cStatus.setMaxQueuedCount(5); + cStatus.setOutputBytes(6); + cStatus.setOutputCount(7); + cStatus.setQueuedBytes(8l); + cStatus.setQueuedCount(9); + + return cStatus; + } + + public static FlowFile createFlowFile(final long id, final Map<String, String> attributes) { + MockFlowFile mockFlowFile = new MockFlowFile(id); + mockFlowFile.putAttributes(attributes); + return mockFlowFile; + } + + private static final class MockSiteToSiteStatusReportingTask extends SiteToSiteStatusReportingTask { + + final List<byte[]> dataSent = new ArrayList<>(); + + @Override + protected SiteToSiteClient getClient() { + final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class); + final Transaction transaction = Mockito.mock(Transaction.class); + + try { + Mockito.doAnswer(new Answer<Object>() { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable { + final byte[] data = invocation.getArgumentAt(0, byte[].class); + dataSent.add(data); + return null; + } + }).when(transaction).send(Mockito.any(byte[].class), Mockito.any(Map.class)); + + Mockito.when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction); + } catch (final Exception e) { + e.printStackTrace(); + Assert.fail(e.toString()); + } + + return client; + } + + public List<byte[]> getDataSent() { + return dataSent; + } + } + +}
