Repository: nifi-minifi Updated Branches: refs/heads/master 59f2d4418 -> 61e3a925d
MINIFI-13 created a provenance reporting task to send provenance information via S2S This closes #6 Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi/commit/61e3a925 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi/tree/61e3a925 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi/diff/61e3a925 Branch: refs/heads/master Commit: 61e3a925d3af52da04a2866a298bb88264b20c5b Parents: 59f2d44 Author: Joseph Percivall <[email protected]> Authored: Fri Apr 8 13:17:33 2016 -0400 Committer: Joseph Percivall <[email protected]> Committed: Tue Apr 12 17:03:59 2016 -0400 ---------------------------------------------------------------------- minifi-assembly/pom.xml | 6 + .../minifi-framework-bundle/pom.xml | 2 +- .../minifi-provenance-reporting-nar/pom.xml | 41 ++ .../src/main/resources/META-INF/NOTICE | 15 + .../minifi-provenance-reporting-task/pom.xml | 82 ++++ .../reporting/ProvenanceReportingTask.java | 457 +++++++++++++++++++ .../org.apache.nifi.reporting.ReportingTask | 16 + .../reporting/TestProvenanceReportingTask.java | 186 ++++++++ .../minifi-provenance-reporting-bundle/pom.xml | 41 ++ minifi-nar-bundles/pom.xml | 3 +- pom.xml | 13 +- 11 files changed, 854 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/61e3a925/minifi-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/minifi-assembly/pom.xml b/minifi-assembly/pom.xml index b076459..8dd223a 100644 --- a/minifi-assembly/pom.xml +++ b/minifi-assembly/pom.xml @@ -105,6 +105,12 @@ limitations under the License. <artifactId>minifi-runtime</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency> + <dependency> + <groupId>org.apache.nifi.minifi</groupId> + <artifactId>minifi-provenance-reporting-nar</artifactId> + <version>0.0.1-SNAPSHOT</version> + <type>nar</type> + </dependency> <!-- MiNiFi NiFi Dependencies --> <dependency> http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/61e3a925/minifi-nar-bundles/minifi-framework-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-framework-bundle/pom.xml b/minifi-nar-bundles/minifi-framework-bundle/pom.xml index 7fbd6b7..dca3a5c 100644 --- a/minifi-nar-bundles/minifi-framework-bundle/pom.xml +++ b/minifi-nar-bundles/minifi-framework-bundle/pom.xml @@ -20,7 +20,7 @@ limitations under the License. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>minifi-nar-bundles</artifactId> - <groupId>org.apache.nifi</groupId> + <groupId>org.apache.nifi.minifi</groupId> <version>0.0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/61e3a925/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-nar/pom.xml ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-nar/pom.xml b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-nar/pom.xml new file mode 100644 index 0000000..be74790 --- /dev/null +++ b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-nar/pom.xml @@ -0,0 +1,41 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi.minifi</groupId> + <artifactId>minifi-provenance-reporting-bundle</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + + <artifactId>minifi-provenance-reporting-nar</artifactId> + <packaging>nar</packaging> + <properties> + <maven.javadoc.skip>true</maven.javadoc.skip> + <source.skip>true</source.skip> + </properties> + <dependencies> + <dependency> + <groupId>org.apache.nifi.minifi</groupId> + <artifactId>minifi-provenance-reporting-task</artifactId> + <version>0.0.1-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-standard-services-api-nar</artifactId> + <type>nar</type> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/61e3a925/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-nar/src/main/resources/META-INF/NOTICE ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-nar/src/main/resources/META-INF/NOTICE b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000..be55e59 --- /dev/null +++ b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,15 @@ +minifi-provenance-reporting-task-nar +Copyright 2015-2016 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +************************ +Common Development and Distribution License 1.1 +************************ + +The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details. + + (CDDL 1.1) (GPL2 w/ CPE) JSON Processing API (javax.json:javax.json-api:jar:1.0 - http://json-processing-spec.java.net) + (CDDL 1.1) (GPL2 w/ CPE) JSON Processing Default Provider (org.glassfish:javax.json:jar:1.0.4 - https://jsonp.java.net) + http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/61e3a925/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/pom.xml ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/pom.xml b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/pom.xml new file mode 100644 index 0000000..2eb157e --- /dev/null +++ b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/pom.xml @@ -0,0 +1,82 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi.minifi</groupId> + <artifactId>minifi-provenance-reporting-bundle</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + + <artifactId>minifi-provenance-reporting-task</artifactId> + <description>Publishes MiNiFi metrics to NiFi via S2S</description> + <version>0.0.1-SNAPSHOT</version> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-processor-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-ssl-context-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-site-to-site-client</artifactId> + </dependency> + <dependency> + <groupId>org.glassfish</groupId> + <artifactId>javax.json</artifactId> + <version>1.0.4</version> + </dependency> + <dependency> + <groupId>javax.json</groupId> + <artifactId>javax.json-api</artifactId> + <version>1.0</version> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-data-provenance-utils</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/61e3a925/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/java/org/apache/nifi/minifi/provenance/reporting/ProvenanceReportingTask.java ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/java/org/apache/nifi/minifi/provenance/reporting/ProvenanceReportingTask.java b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/java/org/apache/nifi/minifi/provenance/reporting/ProvenanceReportingTask.java new file mode 100644 index 0000000..8ed5dee --- /dev/null +++ b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/java/org/apache/nifi/minifi/provenance/reporting/ProvenanceReportingTask.java @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.provenance.reporting; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TimeZone; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import javax.json.Json; +import javax.json.JsonArray; +import javax.json.JsonArrayBuilder; +import javax.json.JsonBuilderFactory; +import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; +import javax.net.ssl.SSLContext; + +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.status.PortStatus; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.RemoteProcessGroupStatus; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.reporting.AbstractReportingTask; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.ssl.SSLContextService.ClientAuth; + +@Tags({"provenance", "lineage", "tracking", "site", "site to site"}) +@CapabilityDescription("Publishes Provenance events using the Site To Site protocol.") +@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last event Id so that on restart of MiNiFi the task knows where it left off.") +public class ProvenanceReportingTask extends AbstractReportingTask { + private static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; + private static final String LAST_EVENT_ID_KEY = "last_event_id"; + + static final PropertyDescriptor DESTINATION_URL = new PropertyDescriptor.Builder() + .name("Destination URL") + .description("The URL to post the Provenance Events to.") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.URL_VALIDATOR) + .build(); + static final PropertyDescriptor PORT_NAME = new PropertyDescriptor.Builder() + .name("Input Port Name") + .description("The name of the Input Port to delivery Provenance Events to.") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder() + .name("SSL Context Service") + .description("The SSL Context Service to use when communicating with the destination. If not specified, communications will not be secure.") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); + static final PropertyDescriptor MINIFI_URL = new PropertyDescriptor.Builder() + .name("MiNiFi URL") + .description("The URL of this MiNiFi instance. This is used to include the Content URI to send to the destination.") + .required(true) + .expressionLanguageSupported(true) + .defaultValue("http://${hostname(true)}:8080/nifi") + .addValidator(new NiFiUrlValidator()) + .build(); + static final PropertyDescriptor COMPRESS = new PropertyDescriptor.Builder() + .name("Compress Events") + .description("Indicates whether or not to compress the events when being sent.") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() + .name("Communications Timeout") + .description("Specifies how long to wait to a response from the destination before deciding that an error has occurred and canceling the transaction") + .required(true) + .defaultValue("30 secs") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("Specifies how many records to send in a single batch, at most.") + .required(true) + .defaultValue("1000") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + private volatile long firstEventId = -1L; + private volatile SiteToSiteClient siteToSiteClient; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(DESTINATION_URL); + properties.add(PORT_NAME); + properties.add(SSL_CONTEXT); + properties.add(MINIFI_URL); + properties.add(COMPRESS); + properties.add(TIMEOUT); + properties.add(BATCH_SIZE); + return properties; + } + + @OnScheduled + public void setup(final ConfigurationContext context) throws IOException { + final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class); + final SSLContext sslContext = sslContextService == null ? null : sslContextService.createSSLContext(ClientAuth.REQUIRED); + final EventReporter eventReporter = new EventReporter() { + @Override + public void reportEvent(final Severity severity, final String category, final String message) { + switch (severity) { + case WARNING: + getLogger().warn(message); + break; + case ERROR: + getLogger().error(message); + break; + default: + break; + } + } + }; + + final String destinationUrlPrefix = context.getProperty(DESTINATION_URL).evaluateAttributeExpressions().getValue(); + final String destinationUrl = destinationUrlPrefix + (destinationUrlPrefix.endsWith("/") ? "nifi" : "/nifi"); + + siteToSiteClient = new SiteToSiteClient.Builder() + .url(destinationUrl) + .portName(context.getProperty(PORT_NAME).getValue()) + .useCompression(context.getProperty(COMPRESS).asBoolean()) + .eventReporter(eventReporter) + .sslContext(sslContext) + .timeout(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) + .build(); + } + + @OnStopped + public void shutdown() throws IOException { + final SiteToSiteClient client = getClient(); + if (client != null) { + client.close(); + } + } + + // this getter is intended explicitly for testing purposes + protected SiteToSiteClient getClient() { + return this.siteToSiteClient; + } + + private String getComponentName(final ProcessGroupStatus status, final ProvenanceEventRecord event) { + if (status == null) { + return null; + } + + final String componentId = event.getComponentId(); + if (status.getId().equals(componentId)) { + return status.getName(); + } + + for (final ProcessorStatus procStatus : status.getProcessorStatus()) { + if (procStatus.getId().equals(componentId)) { + return procStatus.getName(); + } + } + + for (final PortStatus portStatus : status.getInputPortStatus()) { + if (portStatus.getId().equals(componentId)) { + return portStatus.getName(); + } + } + + for (final PortStatus portStatus : status.getOutputPortStatus()) { + if (portStatus.getId().equals(componentId)) { + return portStatus.getName(); + } + } + + for (final RemoteProcessGroupStatus rpgStatus : status.getRemoteProcessGroupStatus()) { + if (rpgStatus.getId().equals(componentId)) { + return rpgStatus.getName(); + } + } + + for (final ProcessGroupStatus childGroup : status.getProcessGroupStatus()) { + final String componentName = getComponentName(childGroup, event); + if (componentName != null) { + return componentName; + } + } + + return null; + } + + @Override + public void onTrigger(final ReportingContext context) { + final ProcessGroupStatus procGroupStatus = context.getEventAccess().getControllerStatus(); + final String rootGroupName = procGroupStatus == null ? null : procGroupStatus.getName(); + + Long currMaxId = context.getEventAccess().getProvenanceRepository().getMaxEventId(); + + if (firstEventId < 0) { + Map<String, String> state; + try { + state = context.getStateManager().getState(Scope.LOCAL).toMap(); + } catch (IOException e) { + getLogger().error("Failed to get state at start up due to {}", e); + return; + } + if (state.containsKey(LAST_EVENT_ID_KEY)) { + firstEventId = Long.parseLong(state.get(LAST_EVENT_ID_KEY)) + 1; + } + + if(currMaxId < firstEventId){ + getLogger().debug("Current provenance max id is {} which is less than what was stored in state as the last queried event, which was {}. This means the provenance restarted its " + + "ids. Restarting querying from the beginning.", new Object[]{currMaxId, firstEventId}); + firstEventId = -1; + } + } + + if (currMaxId == (firstEventId - 1)) { + getLogger().debug("No events to send due to the current max id being equal to the last id that was queried."); + return; + } + + final List<ProvenanceEventRecord> events; + try { + events = context.getEventAccess().getProvenanceEvents(firstEventId, context.getProperty(BATCH_SIZE).asInteger()); + } catch (final IOException ioe) { + getLogger().error("Failed to retrieve Provenance Events from repository due to {}", ioe); + return; + } + + if (events == null || events.isEmpty()) { + getLogger().debug("No events to send due to 'events' being null or empty."); + return; + } + + final long start = System.nanoTime(); + final Map<String, ?> config = Collections.emptyMap(); + final JsonBuilderFactory factory = Json.createBuilderFactory(config); + final JsonObjectBuilder builder = factory.createObjectBuilder(); + + final String nifiUrl = context.getProperty(MINIFI_URL).evaluateAttributeExpressions().getValue(); + URL url; + try { + url = new URL(nifiUrl); + } catch (final MalformedURLException e1) { + // already validated + throw new AssertionError(); + } + + final String hostname = url.getHost(); + + final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder(); + for (final ProvenanceEventRecord event : events) { + arrayBuilder.add(serialize(factory, builder, event, getComponentName(procGroupStatus, event), hostname, url, rootGroupName)); + } + final JsonArray jsonArray = arrayBuilder.build(); + + try { + final Transaction transaction = getClient().createTransaction(TransferDirection.SEND); + if (transaction == null) { + getLogger().debug("All destination nodes are penalized; will attempt to send data later"); + return; + } + + final Map<String, String> attributes = new HashMap<>(); + final String transactionId = UUID.randomUUID().toString(); + attributes.put("reporting.task.transaction.id", transactionId); + + final byte[] data = jsonArray.toString().getBytes(StandardCharsets.UTF_8); + transaction.send(data, attributes); + transaction.confirm(); + transaction.complete(); + + final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + getLogger().info("Successfully sent {} Provenance Events to destination in {} ms; Transaction ID = {}; First Event ID = {}", + new Object[] {events.size(), transferMillis, transactionId, events.get(0).getEventId()}); + } catch (final IOException e) { + throw new ProcessException("Failed to send Provenance Events to destination due to IOException", e); + } + + final ProvenanceEventRecord lastEvent = events.get(events.size() - 1); + final String lastEventId = String.valueOf(lastEvent.getEventId()); + try { + StateManager stateManager = context.getStateManager(); + StateMap stateMap = stateManager.getState(Scope.LOCAL); + Map<String, String> newMapOfState = new HashMap<>(); + newMapOfState.put(LAST_EVENT_ID_KEY, lastEventId); + stateManager.replace(stateMap, newMapOfState, Scope.LOCAL); + } catch (final IOException ioe) { + getLogger().error("Failed to update state to {} due to {}; this could result in events being re-sent after a restart of MiNiFi", + new Object[] {lastEventId, ioe}, ioe); + } + + firstEventId = lastEvent.getEventId() + 1; + } + + static JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final ProvenanceEventRecord event, + final String componentName, final String hostname, final URL nifiUrl, final String applicationName) { + addField(builder, "eventId", UUID.randomUUID().toString()); + addField(builder, "eventOrdinal", event.getEventId()); + addField(builder, "eventType", event.getEventType().name()); + addField(builder, "timestampMillis", event.getEventTime()); + + final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT); + df.setTimeZone(TimeZone.getTimeZone("Z")); + addField(builder, "timestamp", df.format(event.getEventTime())); + + addField(builder, "durationMillis", event.getEventDuration()); + addField(builder, "lineageStart", event.getLineageStartDate()); + + final Set<String> lineageIdentifiers = new HashSet<>(); + if (event.getLineageIdentifiers() != null) { + lineageIdentifiers.addAll(event.getLineageIdentifiers()); + } + lineageIdentifiers.add(event.getFlowFileUuid()); + addField(builder, factory, "lineageIdentifiers", lineageIdentifiers); + addField(builder, "details", event.getDetails()); + addField(builder, "componentId", event.getComponentId()); + addField(builder, "componentType", event.getComponentType()); + addField(builder, "componentName", componentName); + addField(builder, "entityId", event.getFlowFileUuid()); + addField(builder, "entityType", "org.apache.nifi.flowfile.FlowFile"); + addField(builder, "entitySize", event.getFileSize()); + addField(builder, "previousEntitySize", event.getPreviousFileSize()); + addField(builder, factory, "updatedAttributes", event.getUpdatedAttributes()); + addField(builder, factory, "previousAttributes", event.getPreviousAttributes()); + + addField(builder, "actorHostname", hostname); + if (nifiUrl != null) { + final String urlPrefix = nifiUrl.toString().replace(nifiUrl.getPath(), ""); + final String contentUriBase = urlPrefix + "/nifi-api/controller/provenance/events/" + event.getEventId() + "/content/"; + addField(builder, "contentURI", contentUriBase + "output"); + addField(builder, "previousContentURI", contentUriBase + "input"); + } + + addField(builder, factory, "parentIds", event.getParentUuids()); + addField(builder, factory, "childIds", event.getChildUuids()); + addField(builder, "transitUri", event.getTransitUri()); + addField(builder, "remoteIdentifier", event.getSourceSystemFlowFileIdentifier()); + addField(builder, "alternateIdentifier", event.getAlternateIdentifierUri()); + addField(builder, "platform", "minifi"); + addField(builder, "application", applicationName); + + return builder.build(); + } + + private static void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Map<String, String> values) { + if (values == null) { + return; + } + + final JsonObjectBuilder mapBuilder = factory.createObjectBuilder(); + for (final Map.Entry<String, String> entry : values.entrySet()) { + if (entry.getKey() == null || entry.getValue() == null) { + continue; + } + + mapBuilder.add(entry.getKey(), entry.getValue()); + } + + builder.add(key, mapBuilder); + } + + private static void addField(final JsonObjectBuilder builder, final String key, final Long value) { + if (value != null) { + builder.add(key, value.longValue()); + } + } + + private static void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Collection<String> values) { + if (values == null) { + return; + } + + builder.add(key, createJsonArray(factory, values)); + } + + private static void addField(final JsonObjectBuilder builder, final String key, final String value) { + if (value == null) { + return; + } + + builder.add(key, value); + } + + private static JsonArrayBuilder createJsonArray(JsonBuilderFactory factory, final Collection<String> values) { + final JsonArrayBuilder builder = factory.createArrayBuilder(); + for (final String value : values) { + if (value != null) { + builder.add(value); + } + } + return builder; + } + + + private static class NiFiUrlValidator implements Validator { + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + final String value = context.newPropertyValue(input).evaluateAttributeExpressions().getValue(); + try { + new URL(value); + } catch (final Exception e) { + return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("Not a valid URL").build(); + } + + return new ValidationResult.Builder().input(input).subject(subject).valid(true).build(); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/61e3a925/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask new file mode 100644 index 0000000..331d759 --- /dev/null +++ b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.nifi.minifi.provenance.reporting.ProvenanceReportingTask \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/61e3a925/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/test/java/org/apache/nifi/minifi/provenance/reporting/TestProvenanceReportingTask.java ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/test/java/org/apache/nifi/minifi/provenance/reporting/TestProvenanceReportingTask.java b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/test/java/org/apache/nifi/minifi/provenance/reporting/TestProvenanceReportingTask.java new file mode 100644 index 0000000..97291c0 --- /dev/null +++ b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/test/java/org/apache/nifi/minifi/provenance/reporting/TestProvenanceReportingTask.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.provenance.reporting; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.provenance.ProvenanceEventBuilder; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventRepository; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.provenance.StandardProvenanceEventRecord; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.reporting.EventAccess; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.ReportingInitializationContext; +import org.apache.nifi.state.MockStateManager; +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.MockPropertyValue; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import javax.json.Json; +import javax.json.JsonObject; +import javax.json.JsonReader; + +public class TestProvenanceReportingTask { + + @Test + public void testSerializedForm() throws IOException, InitializationException { + final String uuid = "10000000-0000-0000-0000-000000000000"; + final Map<String, String> attributes = new HashMap<>(); + attributes.put("abc", "xyz"); + attributes.put("xyz", "abc"); + attributes.put("filename", "file-" + uuid); + + final Map<String, String> prevAttrs = new HashMap<>(); + attributes.put("filename", "1234.xyz"); + + final Set<String> lineageIdentifiers = new HashSet<>(); + lineageIdentifiers.add("123"); + lineageIdentifiers.add("321"); + + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.setEventTime(System.currentTimeMillis()); + builder.setEventType(ProvenanceEventType.RECEIVE); + builder.setTransitUri("nifi://unit-test"); + attributes.put("uuid", uuid); + builder.fromFlowFile(createFlowFile(3L, attributes)); + builder.setAttributes(prevAttrs, attributes); + builder.setComponentId("1234"); + builder.setComponentType("dummy processor"); + builder.setLineageIdentifiers(lineageIdentifiers); + final ProvenanceEventRecord event = builder.build(); + + final List<byte[]> dataSent = new ArrayList<>(); + final ProvenanceReportingTask task = new ProvenanceReportingTask() { + @SuppressWarnings("unchecked") + @Override + protected SiteToSiteClient getClient() { + final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class); + final Transaction transaction = Mockito.mock(Transaction.class); + + try { + Mockito.doAnswer(new Answer<Object>() { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable { + final byte[] data = invocation.getArgumentAt(0, byte[].class); + dataSent.add(data); + return null; + } + }).when(transaction).send(Mockito.any(byte[].class), Mockito.any(Map.class)); + + Mockito.when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction); + } catch (final Exception e) { + e.printStackTrace(); + Assert.fail(e.toString()); + } + + return client; + } + }; + + final List<ProvenanceEventRecord> events = new ArrayList<>(); + events.add(event); + + final Map<PropertyDescriptor, String> properties = new HashMap<>(); + for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) { + properties.put(descriptor, descriptor.getDefaultValue()); + } + properties.put(ProvenanceReportingTask.BATCH_SIZE, "1000"); + + final ReportingContext context = Mockito.mock(ReportingContext.class); + Mockito.when(context.getStateManager()) + .thenReturn(new MockStateManager(task)); + Mockito.doAnswer(new Answer<PropertyValue>() { + @Override + public PropertyValue answer(final InvocationOnMock invocation) throws Throwable { + final PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor.class); + return new MockPropertyValue(properties.get(descriptor), null); + } + }).when(context).getProperty(Mockito.any(PropertyDescriptor.class)); + + final EventAccess eventAccess = Mockito.mock(EventAccess.class); + Mockito.doAnswer(new Answer<List<ProvenanceEventRecord>>() { + @Override + public List<ProvenanceEventRecord> answer(final InvocationOnMock invocation) throws Throwable { + final long startId = invocation.getArgumentAt(0, long.class); + final int maxRecords = invocation.getArgumentAt(1, int.class); + + final List<ProvenanceEventRecord> eventsToReturn = new ArrayList<>(); + for (int i = (int) Math.max(0, startId); i < (int) (startId + maxRecords) && i < events.size(); i++) { + eventsToReturn.add(events.get(i)); + } + return eventsToReturn; + } + }).when(eventAccess).getProvenanceEvents(Mockito.anyLong(), Mockito.anyInt()); + + final ProvenanceEventRepository provenanceRepository = Mockito.mock(ProvenanceEventRepository.class); + Mockito.doAnswer(new Answer<Long>() { + @Override + public Long answer(final InvocationOnMock invocation) throws Throwable { + return 1L; + } + }).when(provenanceRepository).getMaxEventId(); + + Mockito.when(context.getEventAccess()).thenReturn(eventAccess); + Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository); + + final ComponentLog logger = Mockito.mock(ComponentLog.class); + final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class); + Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString()); + Mockito.when(initContext.getLogger()).thenReturn(logger); + + + task.initialize(initContext); + task.onTrigger(context); + + assertEquals(1, dataSent.size()); + final String msg = new String(dataSent.get(0), StandardCharsets.UTF_8); + JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes())); + JsonObject msgArray = jsonReader.readArray().getJsonObject(0).getJsonObject("updatedAttributes"); + assertEquals(msgArray.getString("abc"), events.get(0).getAttributes().get("abc")); + } + + public static FlowFile createFlowFile(final long id, final Map<String, String> attributes) { + MockFlowFile mockFlowFile = new MockFlowFile(id); + mockFlowFile.putAttributes(attributes); + return mockFlowFile; + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/61e3a925/minifi-nar-bundles/minifi-provenance-reporting-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-provenance-reporting-bundle/pom.xml b/minifi-nar-bundles/minifi-provenance-reporting-bundle/pom.xml new file mode 100644 index 0000000..0d34004 --- /dev/null +++ b/minifi-nar-bundles/minifi-provenance-reporting-bundle/pom.xml @@ -0,0 +1,41 @@ +<?xml version="1.0"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi.minifi</groupId> + <artifactId>minifi-nar-bundles</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + + <artifactId>minifi-provenance-reporting-bundle</artifactId> + <packaging>pom</packaging> + + <modules> + <module>minifi-provenance-reporting-task</module> + <module>minifi-provenance-reporting-nar</module> + </modules> + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.glassfish.jersey.core</groupId> + <artifactId>jersey-client</artifactId> + <version>2.19</version> + </dependency> + </dependencies> + </dependencyManagement> +</project> http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/61e3a925/minifi-nar-bundles/pom.xml ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/pom.xml b/minifi-nar-bundles/pom.xml index aeaad02..79a8414 100644 --- a/minifi-nar-bundles/pom.xml +++ b/minifi-nar-bundles/pom.xml @@ -20,10 +20,11 @@ <artifactId>minifi</artifactId> <version>0.0.1-SNAPSHOT</version> </parent> - <groupId>org.apache.nifi</groupId> + <groupId>org.apache.nifi.minifi</groupId> <artifactId>minifi-nar-bundles</artifactId> <packaging>pom</packaging> <modules> <module>minifi-framework-bundle</module> + <module>minifi-provenance-reporting-bundle</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/61e3a925/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 2c8a881..cdd8357 100644 --- a/pom.xml +++ b/pom.xml @@ -104,7 +104,7 @@ limitations under the License. </dependency> <dependency> <groupId>org.mockito</groupId> - <artifactId>mockito-core</artifactId> + <artifactId>mockito-all</artifactId> <scope>test</scope> </dependency> <dependency> @@ -345,6 +345,11 @@ limitations under the License. <artifactId>nifi-write-ahead-log</artifactId> <version>${org.apache.nifi.version}</version> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-ssl-context-service-api</artifactId> + <version>${org.apache.nifi.version}</version> + </dependency> <!-- Test Dependencies --> <dependency> @@ -354,11 +359,6 @@ limitations under the License. </dependency> <dependency> <groupId>org.mockito</groupId> - <artifactId>mockito-core</artifactId> - <version>1.10.19</version> - </dependency> - <dependency> - <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> <version>1.10.19</version> <scope>test</scope> @@ -368,6 +368,7 @@ limitations under the License. <artifactId>slf4j-simple</artifactId> <version>${org.slf4j.version}</version> </dependency> + </dependencies> </dependencyManagement>
