http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/52888507/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-nar/src/main/resources/META-INF/NOTICE ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-nar/src/main/resources/META-INF/NOTICE b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-nar/src/main/resources/META-INF/NOTICE deleted file mode 100644 index be55e59..0000000 --- a/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-nar/src/main/resources/META-INF/NOTICE +++ /dev/null @@ -1,15 +0,0 @@ -minifi-provenance-reporting-task-nar -Copyright 2015-2016 The Apache Software Foundation - -This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). - -************************ -Common Development and Distribution License 1.1 -************************ - -The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details. - - (CDDL 1.1) (GPL2 w/ CPE) JSON Processing API (javax.json:javax.json-api:jar:1.0 - http://json-processing-spec.java.net) - (CDDL 1.1) (GPL2 w/ CPE) JSON Processing Default Provider (org.glassfish:javax.json:jar:1.0.4 - https://jsonp.java.net) -
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/52888507/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/pom.xml ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/pom.xml b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/pom.xml deleted file mode 100644 index 837b94f..0000000 --- a/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/pom.xml +++ /dev/null @@ -1,82 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.nifi.minifi</groupId> - <artifactId>minifi-provenance-reporting-bundle</artifactId> - <version>0.1.0-SNAPSHOT</version> - </parent> - - <artifactId>minifi-provenance-reporting-task</artifactId> - <description>Publishes MiNiFi metrics to NiFi via S2S</description> - <version>0.1.0-SNAPSHOT</version> - - <dependencies> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-processor-utils</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-utils</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-ssl-context-service-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-site-to-site-client</artifactId> - </dependency> - <dependency> - <groupId>org.glassfish</groupId> - <artifactId>javax.json</artifactId> - <version>1.0.4</version> - </dependency> - <dependency> - <groupId>javax.json</groupId> - <artifactId>javax.json-api</artifactId> - <version>1.0</version> - </dependency> - - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-all</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-data-provenance-utils</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-mock</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>4.12</version> - <scope>test</scope> - </dependency> - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/52888507/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/java/org/apache/nifi/minifi/provenance/reporting/ProvenanceReportingTask.java ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/java/org/apache/nifi/minifi/provenance/reporting/ProvenanceReportingTask.java b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/java/org/apache/nifi/minifi/provenance/reporting/ProvenanceReportingTask.java deleted file mode 100644 index a0d1776..0000000 --- a/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/java/org/apache/nifi/minifi/provenance/reporting/ProvenanceReportingTask.java +++ /dev/null @@ -1,462 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.minifi.provenance.reporting; - -import java.io.IOException; -import java.net.MalformedURLException; -import java.net.URL; -import java.nio.charset.StandardCharsets; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TimeZone; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -import javax.json.Json; -import javax.json.JsonArray; -import javax.json.JsonArrayBuilder; -import javax.json.JsonBuilderFactory; -import javax.json.JsonObject; -import javax.json.JsonObjectBuilder; -import javax.net.ssl.SSLContext; - -import org.apache.nifi.annotation.behavior.Stateful; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnStopped; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.Validator; -import org.apache.nifi.components.state.Scope; -import org.apache.nifi.components.state.StateManager; -import org.apache.nifi.components.state.StateMap; -import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.controller.status.PortStatus; -import org.apache.nifi.controller.status.ProcessGroupStatus; -import org.apache.nifi.controller.status.ProcessorStatus; -import org.apache.nifi.controller.status.RemoteProcessGroupStatus; -import org.apache.nifi.events.EventReporter; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.provenance.ProvenanceEventRecord; -import org.apache.nifi.remote.Transaction; -import org.apache.nifi.remote.TransferDirection; -import org.apache.nifi.remote.client.SiteToSiteClient; -import org.apache.nifi.reporting.AbstractReportingTask; -import org.apache.nifi.reporting.ReportingContext; -import org.apache.nifi.reporting.Severity; -import org.apache.nifi.ssl.SSLContextService; -import org.apache.nifi.ssl.SSLContextService.ClientAuth; - -@Tags({"provenance", "lineage", "tracking", "site", "site to site"}) -@CapabilityDescription("Publishes Provenance events using the Site To Site protocol.") -@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last event Id so that on restart of MiNiFi the task knows where it left off.") -public class ProvenanceReportingTask extends AbstractReportingTask { - private static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; - private static final String LAST_EVENT_ID_KEY = "last_event_id"; - - static final PropertyDescriptor DESTINATION_URL = new PropertyDescriptor.Builder() - .name("Destination URL") - .description("The URL to post the Provenance Events to.") - .required(true) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.URL_VALIDATOR) - .build(); - static final PropertyDescriptor PORT_NAME = new PropertyDescriptor.Builder() - .name("Input Port Name") - .description("The name of the Input Port to delivery Provenance Events to.") - .required(true) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder() - .name("SSL Context Service") - .description("The SSL Context Service to use when communicating with the destination. If not specified, communications will not be secure.") - .required(false) - .identifiesControllerService(SSLContextService.class) - .build(); - static final PropertyDescriptor MINIFI_URL = new PropertyDescriptor.Builder() - .name("MiNiFi URL") - .description("The URL of this MiNiFi instance. This is used to include the Content URI to send to the destination.") - .required(true) - .expressionLanguageSupported(true) - .defaultValue("http://${hostname(true)}:8080/nifi") - .addValidator(new NiFiUrlValidator()) - .build(); - static final PropertyDescriptor COMPRESS = new PropertyDescriptor.Builder() - .name("Compress Events") - .description("Indicates whether or not to compress the events when being sent.") - .required(true) - .allowableValues("true", "false") - .defaultValue("true") - .build(); - static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() - .name("Communications Timeout") - .description("Specifies how long to wait to a response from the destination before deciding that an error has occurred and canceling the transaction") - .required(true) - .defaultValue("30 secs") - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .build(); - static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() - .name("Batch Size") - .description("Specifies how many records to send in a single batch, at most.") - .required(true) - .defaultValue("1000") - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .build(); - - private volatile long firstEventId = -1L; - private volatile SiteToSiteClient siteToSiteClient; - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - final List<PropertyDescriptor> properties = new ArrayList<>(); - properties.add(DESTINATION_URL); - properties.add(PORT_NAME); - properties.add(SSL_CONTEXT); - properties.add(MINIFI_URL); - properties.add(COMPRESS); - properties.add(TIMEOUT); - properties.add(BATCH_SIZE); - return properties; - } - - @OnScheduled - public void setup(final ConfigurationContext context) throws IOException { - final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class); - final SSLContext sslContext = sslContextService == null ? null : sslContextService.createSSLContext(ClientAuth.REQUIRED); - final EventReporter eventReporter = new EventReporter() { - @Override - public void reportEvent(final Severity severity, final String category, final String message) { - switch (severity) { - case WARNING: - getLogger().warn(message); - break; - case ERROR: - getLogger().error(message); - break; - default: - break; - } - } - }; - - final String destinationUrlPrefix = context.getProperty(DESTINATION_URL).evaluateAttributeExpressions().getValue(); - final String destinationUrl = destinationUrlPrefix + (destinationUrlPrefix.endsWith("/") ? "nifi" : "/nifi"); - - siteToSiteClient = new SiteToSiteClient.Builder() - .url(destinationUrl) - .portName(context.getProperty(PORT_NAME).getValue()) - .useCompression(context.getProperty(COMPRESS).asBoolean()) - .eventReporter(eventReporter) - .sslContext(sslContext) - .timeout(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) - .build(); - } - - @OnStopped - public void shutdown() throws IOException { - final SiteToSiteClient client = getClient(); - if (client != null) { - client.close(); - } - } - - // this getter is intended explicitly for testing purposes - protected SiteToSiteClient getClient() { - return this.siteToSiteClient; - } - - private String getComponentName(final ProcessGroupStatus status, final ProvenanceEventRecord event) { - if (status == null) { - return null; - } - - final String componentId = event.getComponentId(); - if (status.getId().equals(componentId)) { - return status.getName(); - } - - for (final ProcessorStatus procStatus : status.getProcessorStatus()) { - if (procStatus.getId().equals(componentId)) { - return procStatus.getName(); - } - } - - for (final PortStatus portStatus : status.getInputPortStatus()) { - if (portStatus.getId().equals(componentId)) { - return portStatus.getName(); - } - } - - for (final PortStatus portStatus : status.getOutputPortStatus()) { - if (portStatus.getId().equals(componentId)) { - return portStatus.getName(); - } - } - - for (final RemoteProcessGroupStatus rpgStatus : status.getRemoteProcessGroupStatus()) { - if (rpgStatus.getId().equals(componentId)) { - return rpgStatus.getName(); - } - } - - for (final ProcessGroupStatus childGroup : status.getProcessGroupStatus()) { - final String componentName = getComponentName(childGroup, event); - if (componentName != null) { - return componentName; - } - } - - return null; - } - - @Override - public void onTrigger(final ReportingContext context) { - final ProcessGroupStatus procGroupStatus = context.getEventAccess().getControllerStatus(); - final String rootGroupName = procGroupStatus == null ? null : procGroupStatus.getName(); - - Long currMaxId = context.getEventAccess().getProvenanceRepository().getMaxEventId(); - - if(currMaxId == null) { - getLogger().debug("No events to send because no events have been created yet."); - return; - } - - if (firstEventId < 0) { - Map<String, String> state; - try { - state = context.getStateManager().getState(Scope.LOCAL).toMap(); - } catch (IOException e) { - getLogger().error("Failed to get state at start up due to {}:"+e.getMessage(), e); - return; - } - if (state.containsKey(LAST_EVENT_ID_KEY)) { - firstEventId = Long.parseLong(state.get(LAST_EVENT_ID_KEY)) + 1; - } - - if(currMaxId < firstEventId){ - getLogger().debug("Current provenance max id is {} which is less than what was stored in state as the last queried event, which was {}. This means the provenance restarted its " + - "ids. Restarting querying from the beginning.", new Object[]{currMaxId, firstEventId}); - firstEventId = -1; - } - } - - if (currMaxId == (firstEventId - 1)) { - getLogger().debug("No events to send due to the current max id being equal to the last id that was queried."); - return; - } - - final List<ProvenanceEventRecord> events; - try { - events = context.getEventAccess().getProvenanceEvents(firstEventId, context.getProperty(BATCH_SIZE).asInteger()); - } catch (final IOException ioe) { - getLogger().error("Failed to retrieve Provenance Events from repository due to {}:"+ioe.getMessage(), ioe); - return; - } - - if (events == null || events.isEmpty()) { - getLogger().debug("No events to send due to 'events' being null or empty."); - return; - } - - final long start = System.nanoTime(); - final Map<String, ?> config = Collections.emptyMap(); - final JsonBuilderFactory factory = Json.createBuilderFactory(config); - final JsonObjectBuilder builder = factory.createObjectBuilder(); - - final String nifiUrl = context.getProperty(MINIFI_URL).evaluateAttributeExpressions().getValue(); - URL url; - try { - url = new URL(nifiUrl); - } catch (final MalformedURLException e1) { - // already validated - throw new AssertionError(); - } - - final String hostname = url.getHost(); - - final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder(); - for (final ProvenanceEventRecord event : events) { - arrayBuilder.add(serialize(factory, builder, event, getComponentName(procGroupStatus, event), hostname, url, rootGroupName)); - } - final JsonArray jsonArray = arrayBuilder.build(); - - try { - final Transaction transaction = getClient().createTransaction(TransferDirection.SEND); - if (transaction == null) { - getLogger().debug("All destination nodes are penalized; will attempt to send data later"); - return; - } - - final Map<String, String> attributes = new HashMap<>(); - final String transactionId = UUID.randomUUID().toString(); - attributes.put("reporting.task.transaction.id", transactionId); - - final byte[] data = jsonArray.toString().getBytes(StandardCharsets.UTF_8); - transaction.send(data, attributes); - transaction.confirm(); - transaction.complete(); - - final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - getLogger().info("Successfully sent {} Provenance Events to destination in {} ms; Transaction ID = {}; First Event ID = {}", - new Object[] {events.size(), transferMillis, transactionId, events.get(0).getEventId()}); - } catch (final IOException e) { - throw new ProcessException("Failed to send Provenance Events to destination due to IOException:" + e.getMessage(), e); - } - - final ProvenanceEventRecord lastEvent = events.get(events.size() - 1); - final String lastEventId = String.valueOf(lastEvent.getEventId()); - try { - StateManager stateManager = context.getStateManager(); - StateMap stateMap = stateManager.getState(Scope.LOCAL); - Map<String, String> newMapOfState = new HashMap<>(); - newMapOfState.put(LAST_EVENT_ID_KEY, lastEventId); - stateManager.replace(stateMap, newMapOfState, Scope.LOCAL); - } catch (final IOException ioe) { - getLogger().error("Failed to update state to {} due to {}; this could result in events being re-sent after a restart of MiNiFi. The message of {} was: {}", - new Object[] {lastEventId, ioe, ioe, ioe.getMessage()}, ioe); - } - - firstEventId = lastEvent.getEventId() + 1; - } - - static JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final ProvenanceEventRecord event, - final String componentName, final String hostname, final URL nifiUrl, final String applicationName) { - addField(builder, "eventId", UUID.randomUUID().toString()); - addField(builder, "eventOrdinal", event.getEventId()); - addField(builder, "eventType", event.getEventType().name()); - addField(builder, "timestampMillis", event.getEventTime()); - - final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT); - df.setTimeZone(TimeZone.getTimeZone("Z")); - addField(builder, "timestamp", df.format(event.getEventTime())); - - addField(builder, "durationMillis", event.getEventDuration()); - addField(builder, "lineageStart", event.getLineageStartDate()); - - final Set<String> lineageIdentifiers = new HashSet<>(); - if (event.getLineageIdentifiers() != null) { - lineageIdentifiers.addAll(event.getLineageIdentifiers()); - } - lineageIdentifiers.add(event.getFlowFileUuid()); - addField(builder, factory, "lineageIdentifiers", lineageIdentifiers); - addField(builder, "details", event.getDetails()); - addField(builder, "componentId", event.getComponentId()); - addField(builder, "componentType", event.getComponentType()); - addField(builder, "componentName", componentName); - addField(builder, "entityId", event.getFlowFileUuid()); - addField(builder, "entityType", "org.apache.nifi.flowfile.FlowFile"); - addField(builder, "entitySize", event.getFileSize()); - addField(builder, "previousEntitySize", event.getPreviousFileSize()); - addField(builder, factory, "updatedAttributes", event.getUpdatedAttributes()); - addField(builder, factory, "previousAttributes", event.getPreviousAttributes()); - - addField(builder, "actorHostname", hostname); - if (nifiUrl != null) { - final String urlPrefix = nifiUrl.toString().replace(nifiUrl.getPath(), ""); - final String contentUriBase = urlPrefix + "/nifi-api/controller/provenance/events/" + event.getEventId() + "/content/"; - addField(builder, "contentURI", contentUriBase + "output"); - addField(builder, "previousContentURI", contentUriBase + "input"); - } - - addField(builder, factory, "parentIds", event.getParentUuids()); - addField(builder, factory, "childIds", event.getChildUuids()); - addField(builder, "transitUri", event.getTransitUri()); - addField(builder, "remoteIdentifier", event.getSourceSystemFlowFileIdentifier()); - addField(builder, "alternateIdentifier", event.getAlternateIdentifierUri()); - addField(builder, "platform", "minifi"); - addField(builder, "application", applicationName); - - return builder.build(); - } - - private static void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Map<String, String> values) { - if (values == null) { - return; - } - - final JsonObjectBuilder mapBuilder = factory.createObjectBuilder(); - for (final Map.Entry<String, String> entry : values.entrySet()) { - if (entry.getKey() == null || entry.getValue() == null) { - continue; - } - - mapBuilder.add(entry.getKey(), entry.getValue()); - } - - builder.add(key, mapBuilder); - } - - private static void addField(final JsonObjectBuilder builder, final String key, final Long value) { - if (value != null) { - builder.add(key, value.longValue()); - } - } - - private static void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Collection<String> values) { - if (values == null) { - return; - } - - builder.add(key, createJsonArray(factory, values)); - } - - private static void addField(final JsonObjectBuilder builder, final String key, final String value) { - if (value == null) { - return; - } - - builder.add(key, value); - } - - private static JsonArrayBuilder createJsonArray(JsonBuilderFactory factory, final Collection<String> values) { - final JsonArrayBuilder builder = factory.createArrayBuilder(); - for (final String value : values) { - if (value != null) { - builder.add(value); - } - } - return builder; - } - - - private static class NiFiUrlValidator implements Validator { - @Override - public ValidationResult validate(final String subject, final String input, final ValidationContext context) { - final String value = context.newPropertyValue(input).evaluateAttributeExpressions().getValue(); - try { - new URL(value); - } catch (final Exception e) { - return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("Not a valid URL").build(); - } - - return new ValidationResult.Builder().input(input).subject(subject).valid(true).build(); - } - } -} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/52888507/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask deleted file mode 100644 index 331d759..0000000 --- a/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -org.apache.nifi.minifi.provenance.reporting.ProvenanceReportingTask \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/52888507/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/test/java/org/apache/nifi/minifi/provenance/reporting/TestProvenanceReportingTask.java ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/test/java/org/apache/nifi/minifi/provenance/reporting/TestProvenanceReportingTask.java b/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/test/java/org/apache/nifi/minifi/provenance/reporting/TestProvenanceReportingTask.java deleted file mode 100644 index 97291c0..0000000 --- a/minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/test/java/org/apache/nifi/minifi/provenance/reporting/TestProvenanceReportingTask.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.minifi.provenance.reporting; - -import static org.junit.Assert.assertEquals; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; - -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.PropertyValue; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.provenance.ProvenanceEventBuilder; -import org.apache.nifi.provenance.ProvenanceEventRecord; -import org.apache.nifi.provenance.ProvenanceEventRepository; -import org.apache.nifi.provenance.ProvenanceEventType; -import org.apache.nifi.provenance.StandardProvenanceEventRecord; -import org.apache.nifi.remote.Transaction; -import org.apache.nifi.remote.TransferDirection; -import org.apache.nifi.remote.client.SiteToSiteClient; -import org.apache.nifi.reporting.EventAccess; -import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.reporting.ReportingContext; -import org.apache.nifi.reporting.ReportingInitializationContext; -import org.apache.nifi.state.MockStateManager; -import org.apache.nifi.stream.io.ByteArrayInputStream; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.MockPropertyValue; -import org.junit.Assert; -import org.junit.Test; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import javax.json.Json; -import javax.json.JsonObject; -import javax.json.JsonReader; - -public class TestProvenanceReportingTask { - - @Test - public void testSerializedForm() throws IOException, InitializationException { - final String uuid = "10000000-0000-0000-0000-000000000000"; - final Map<String, String> attributes = new HashMap<>(); - attributes.put("abc", "xyz"); - attributes.put("xyz", "abc"); - attributes.put("filename", "file-" + uuid); - - final Map<String, String> prevAttrs = new HashMap<>(); - attributes.put("filename", "1234.xyz"); - - final Set<String> lineageIdentifiers = new HashSet<>(); - lineageIdentifiers.add("123"); - lineageIdentifiers.add("321"); - - final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); - builder.setEventTime(System.currentTimeMillis()); - builder.setEventType(ProvenanceEventType.RECEIVE); - builder.setTransitUri("nifi://unit-test"); - attributes.put("uuid", uuid); - builder.fromFlowFile(createFlowFile(3L, attributes)); - builder.setAttributes(prevAttrs, attributes); - builder.setComponentId("1234"); - builder.setComponentType("dummy processor"); - builder.setLineageIdentifiers(lineageIdentifiers); - final ProvenanceEventRecord event = builder.build(); - - final List<byte[]> dataSent = new ArrayList<>(); - final ProvenanceReportingTask task = new ProvenanceReportingTask() { - @SuppressWarnings("unchecked") - @Override - protected SiteToSiteClient getClient() { - final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class); - final Transaction transaction = Mockito.mock(Transaction.class); - - try { - Mockito.doAnswer(new Answer<Object>() { - @Override - public Object answer(final InvocationOnMock invocation) throws Throwable { - final byte[] data = invocation.getArgumentAt(0, byte[].class); - dataSent.add(data); - return null; - } - }).when(transaction).send(Mockito.any(byte[].class), Mockito.any(Map.class)); - - Mockito.when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction); - } catch (final Exception e) { - e.printStackTrace(); - Assert.fail(e.toString()); - } - - return client; - } - }; - - final List<ProvenanceEventRecord> events = new ArrayList<>(); - events.add(event); - - final Map<PropertyDescriptor, String> properties = new HashMap<>(); - for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) { - properties.put(descriptor, descriptor.getDefaultValue()); - } - properties.put(ProvenanceReportingTask.BATCH_SIZE, "1000"); - - final ReportingContext context = Mockito.mock(ReportingContext.class); - Mockito.when(context.getStateManager()) - .thenReturn(new MockStateManager(task)); - Mockito.doAnswer(new Answer<PropertyValue>() { - @Override - public PropertyValue answer(final InvocationOnMock invocation) throws Throwable { - final PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor.class); - return new MockPropertyValue(properties.get(descriptor), null); - } - }).when(context).getProperty(Mockito.any(PropertyDescriptor.class)); - - final EventAccess eventAccess = Mockito.mock(EventAccess.class); - Mockito.doAnswer(new Answer<List<ProvenanceEventRecord>>() { - @Override - public List<ProvenanceEventRecord> answer(final InvocationOnMock invocation) throws Throwable { - final long startId = invocation.getArgumentAt(0, long.class); - final int maxRecords = invocation.getArgumentAt(1, int.class); - - final List<ProvenanceEventRecord> eventsToReturn = new ArrayList<>(); - for (int i = (int) Math.max(0, startId); i < (int) (startId + maxRecords) && i < events.size(); i++) { - eventsToReturn.add(events.get(i)); - } - return eventsToReturn; - } - }).when(eventAccess).getProvenanceEvents(Mockito.anyLong(), Mockito.anyInt()); - - final ProvenanceEventRepository provenanceRepository = Mockito.mock(ProvenanceEventRepository.class); - Mockito.doAnswer(new Answer<Long>() { - @Override - public Long answer(final InvocationOnMock invocation) throws Throwable { - return 1L; - } - }).when(provenanceRepository).getMaxEventId(); - - Mockito.when(context.getEventAccess()).thenReturn(eventAccess); - Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository); - - final ComponentLog logger = Mockito.mock(ComponentLog.class); - final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class); - Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString()); - Mockito.when(initContext.getLogger()).thenReturn(logger); - - - task.initialize(initContext); - task.onTrigger(context); - - assertEquals(1, dataSent.size()); - final String msg = new String(dataSent.get(0), StandardCharsets.UTF_8); - JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes())); - JsonObject msgArray = jsonReader.readArray().getJsonObject(0).getJsonObject("updatedAttributes"); - assertEquals(msgArray.getString("abc"), events.get(0).getAttributes().get("abc")); - } - - public static FlowFile createFlowFile(final long id, final Map<String, String> attributes) { - MockFlowFile mockFlowFile = new MockFlowFile(id); - mockFlowFile.putAttributes(attributes); - return mockFlowFile; - } -} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/52888507/minifi-nar-bundles/minifi-provenance-reporting-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-provenance-reporting-bundle/pom.xml b/minifi-nar-bundles/minifi-provenance-reporting-bundle/pom.xml deleted file mode 100644 index f3e70c7..0000000 --- a/minifi-nar-bundles/minifi-provenance-reporting-bundle/pom.xml +++ /dev/null @@ -1,46 +0,0 @@ -<?xml version="1.0"?> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.nifi.minifi</groupId> - <artifactId>minifi-nar-bundles</artifactId> - <version>0.1.0-SNAPSHOT</version> - </parent> - - <artifactId>minifi-provenance-reporting-bundle</artifactId> - <packaging>pom</packaging> - - <modules> - <module>minifi-provenance-reporting-task</module> - <module>minifi-provenance-reporting-nar</module> - </modules> - - <dependencyManagement> - <dependencies> - <dependency> - <groupId>org.apache.nifi.minifi</groupId> - <artifactId>minifi-provenance-reporting-task</artifactId> - <version>0.1.0-SNAPSHOT</version> - </dependency> - <dependency> - <groupId>org.glassfish.jersey.core</groupId> - <artifactId>jersey-client</artifactId> - <version>2.19</version> - </dependency> - </dependencies> - </dependencyManagement> -</project> http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/52888507/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/MiNiFiPersistentProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/MiNiFiPersistentProvenanceRepository.java b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/MiNiFiPersistentProvenanceRepository.java index ea66c38..a015dca 100644 --- a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/MiNiFiPersistentProvenanceRepository.java +++ b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/MiNiFiPersistentProvenanceRepository.java @@ -54,11 +54,14 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Pattern; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.events.EventReporter; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.provenance.expiration.ExpirationAction; import org.apache.nifi.provenance.expiration.FileRemovalAction; -import org.apache.nifi.provenance.lucene.LuceneUtil; +import org.apache.nifi.provenance.lineage.ComputeLineageSubmission; import org.apache.nifi.provenance.search.Query; import org.apache.nifi.provenance.search.QuerySubmission; import org.apache.nifi.provenance.search.SearchableField; @@ -77,7 +80,7 @@ import org.slf4j.LoggerFactory; // TODO: When API, FlowController, and supporting classes are refactored/reimplemented migrate this class and its accompanying imports to minifi package structure -public class MiNiFiPersistentProvenanceRepository implements ProvenanceEventRepository { +public class MiNiFiPersistentProvenanceRepository implements ProvenanceRepository { public static final String EVENT_CATEGORY = "Provenance Repository"; private static final String FILE_EXTENSION = ".prov"; @@ -132,7 +135,20 @@ public class MiNiFiPersistentProvenanceRepository implements ProvenanceEventRepo private Long maxId = null; public MiNiFiPersistentProvenanceRepository() throws IOException { - this(createRepositoryConfiguration(), 10000); + maxPartitionMillis = 0; + maxPartitionBytes = 0; + writers = null; + configuration = null; + alwaysSync = false; + rolloverCheckMillis = 0; + maxAttributeChars = 0; + scheduledExecService = null; + rolloverExecutor = null; + eventReporter = null; + } + + public MiNiFiPersistentProvenanceRepository(final NiFiProperties nifiProperties) throws IOException { + this(createRepositoryConfiguration(nifiProperties), 10000); } public MiNiFiPersistentProvenanceRepository(final RepositoryConfiguration configuration, final int rolloverCheckMillis) throws IOException { @@ -170,7 +186,7 @@ public class MiNiFiPersistentProvenanceRepository implements ProvenanceEventRepo } @Override - public void initialize(final EventReporter eventReporter) throws IOException { + public void initialize(final EventReporter eventReporter, final Authorizer authorizer, final ProvenanceAuthorizableFactory resourceFactory) throws IOException { writeLock.lock(); try { if (initialized.getAndSet(true)) { @@ -237,8 +253,7 @@ public class MiNiFiPersistentProvenanceRepository implements ProvenanceEventRepo } } - private static RepositoryConfiguration createRepositoryConfiguration() throws IOException { - final NiFiProperties properties = NiFiProperties.getInstance(); + private static RepositoryConfiguration createRepositoryConfiguration(final NiFiProperties properties) throws IOException { final Map<String, Path> storageDirectories = properties.getProvenanceRepositoryPaths(); if (storageDirectories.isEmpty()) { storageDirectories.put("provenance_repository", Paths.get("provenance_repository")); @@ -337,6 +352,11 @@ public class MiNiFiPersistentProvenanceRepository implements ProvenanceEventRepo } @Override + public List<ProvenanceEventRecord> getEvents(final long firstRecordId, final int maxRecords, final NiFiUser user) throws IOException { + throw new MethodNotSupportedException("Cannot list events for a specified user."); + } + + @Override public List<ProvenanceEventRecord> getEvents(final long firstRecordId, final int maxRecords) throws IOException { final List<ProvenanceEventRecord> records = new ArrayList<>(maxRecords); @@ -511,7 +531,7 @@ public class MiNiFiPersistentProvenanceRepository implements ProvenanceEventRepo continue; } - final String basename = LuceneUtil.substringBefore(recoveredJournal.getName(), "."); + final String basename = StringUtils.substringBefore(recoveredJournal.getName(), "."); try { final long minId = Long.parseLong(basename); @@ -586,7 +606,7 @@ public class MiNiFiPersistentProvenanceRepository implements ProvenanceEventRepo final int numDirty = dirtyWriterCount.get(); if (numDirty >= recordWriters.length) { throw new IllegalStateException("Cannot update repository because all partitions are unusable at this time. Writing to the repository would cause corruption. " - + "This most often happens as a result of the repository running out of disk space or the JMV running out of memory."); + + "This most often happens as a result of the repository running out of disk space or the JMV running out of memory."); } final long idx = writerIndex.getAndIncrement(); @@ -754,8 +774,8 @@ public class MiNiFiPersistentProvenanceRepository implements ProvenanceEventRepo final Comparator<File> sortByBasenameComparator = new Comparator<File>() { @Override public int compare(final File o1, final File o2) { - final String baseName1 = LuceneUtil.substringBefore(o1.getName(), "."); - final String baseName2 = LuceneUtil.substringBefore(o2.getName(), "."); + final String baseName1 = StringUtils.substringBefore(o1.getName(), "."); + final String baseName2 = StringUtils.substringBefore(o2.getName(), "."); Long id1 = null; Long id2 = null; @@ -809,7 +829,7 @@ public class MiNiFiPersistentProvenanceRepository implements ProvenanceEventRepo // Age off the data. final Set<String> removed = new LinkedHashSet<>(); for (File file : uniqueFilesToPurge) { - final String baseName = LuceneUtil.substringBefore(file.getName(), "."); + final String baseName = StringUtils.substringBefore(file.getName(), "."); ExpirationAction currentAction = null; try { for (final ExpirationAction action : expirationActions) { @@ -826,15 +846,15 @@ public class MiNiFiPersistentProvenanceRepository implements ProvenanceEventRepo removed.add(baseName); } catch (final FileNotFoundException fnf) { logger.warn("Failed to perform Expiration Action {} on Provenance Event file {} because the file no longer exists; will not " - + "perform additional Expiration Actions on this file", currentAction, file); + + "perform additional Expiration Actions on this file", currentAction, file); removed.add(baseName); } catch (final Throwable t) { logger.warn("Failed to perform Expiration Action {} on Provenance Event file {} due to {}; will not perform additional " - + "Expiration Actions on this file at this time", currentAction, file, t.toString()); + + "Expiration Actions on this file at this time", currentAction, file, t.toString()); logger.warn("", t); eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to perform Expiration Action " + currentAction + - " on Provenance Event file " + file + " due to " + t.toString() + "; will not perform additional Expiration Actions " + - "on this file at this time"); + " on Provenance Event file " + file + " due to " + t.toString() + "; will not perform additional Expiration Actions " + + "on this file at this time"); } } @@ -852,7 +872,7 @@ public class MiNiFiPersistentProvenanceRepository implements ProvenanceEventRepo while (itr.hasNext()) { final Map.Entry<Long, Path> entry = itr.next(); final String filename = entry.getValue().toFile().getName(); - final String baseName = LuceneUtil.substringBefore(filename, "."); + final String baseName = StringUtils.substringBefore(filename, "."); if (removed.contains(baseName)) { itr.remove(); @@ -972,7 +992,7 @@ public class MiNiFiPersistentProvenanceRepository implements ProvenanceEventRepo if (journalsToMerge.isEmpty()) { logger.debug("No journals to merge; all RecordWriters were already closed"); } else { - logger.debug("Going to merge {} files for journals starting with ID {}", journalsToMerge.size(), LuceneUtil.substringBefore(journalsToMerge.get(0).getName(), ".")); + logger.debug("Going to merge {} files for journals starting with ID {}", journalsToMerge.size(), StringUtils.substringBefore(journalsToMerge.get(0).getName(), ".")); } } @@ -1010,7 +1030,7 @@ public class MiNiFiPersistentProvenanceRepository implements ProvenanceEventRepo // We need to make sure that another thread doesn't also update the map at the same time. We cannot // use the write lock when purging old events, and we want to use the same approach here. boolean updated = false; - final Long fileFirstEventId = Long.valueOf(LuceneUtil.substringBefore(fileRolledOver.getName(), ".")); + final Long fileFirstEventId = Long.valueOf(StringUtils.substringBefore(fileRolledOver.getName(), ".")); while (!updated) { final SortedMap<Long, Path> existingPathMap = idToPathMap.get(); final SortedMap<Long, Path> newIdToPathMap = new TreeMap<>(new PathMapComparator()); @@ -1064,10 +1084,10 @@ public class MiNiFiPersistentProvenanceRepository implements ProvenanceEventRepo // that is no longer the case. if (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) { logger.warn("The rate of the dataflow is exceeding the provenance recording rate. " - + "Slowing down flow to accommodate. Currently, there are {} journal files ({} bytes) and " - + "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold); + + "Slowing down flow to accommodate. Currently, there are {} journal files ({} bytes) and " + + "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold); eventReporter.reportEvent(Severity.WARNING, "Provenance Repository", "The rate of the dataflow is " - + "exceeding the provenance recording rate. Slowing down flow to accommodate"); + + "exceeding the provenance recording rate. Slowing down flow to accommodate"); while (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) { // if a shutdown happens while we are in this loop, kill the rollover thread and break @@ -1097,15 +1117,15 @@ public class MiNiFiPersistentProvenanceRepository implements ProvenanceEventRepo } logger.debug("Provenance Repository is still behind. Keeping flow slowed down " - + "to accommodate. Currently, there are {} journal files ({} bytes) and " - + "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold); + + "to accommodate. Currently, there are {} journal files ({} bytes) and " + + "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold); journalFileCount = getJournalCount(); repoSize = getSize(getLogFiles(), 0L); } logger.info("Provenance Repository has now caught up with rolling over journal files. Current number of " - + "journal files to be rolled over is {}", journalFileCount); + + "journal files to be rolled over is {}", journalFileCount); } // we've finished rolling over successfully. Create new writers and reset state. @@ -1143,7 +1163,7 @@ public class MiNiFiPersistentProvenanceRepository implements ProvenanceEventRepo continue; } - final String basename = LuceneUtil.substringBefore(journalFile.getName(), "."); + final String basename = StringUtils.substringBefore(journalFile.getName(), "."); List<File> files = journalMap.get(basename); if (files == null) { files = new ArrayList<>(); @@ -1171,7 +1191,7 @@ public class MiNiFiPersistentProvenanceRepository implements ProvenanceEventRepo // verify that all Journal files have the same basename String canonicalBaseName = null; for (final File journal : journalFiles) { - final String basename = LuceneUtil.substringBefore(journal.getName(), "."); + final String basename = StringUtils.substringBefore(journal.getName(), "."); if (canonicalBaseName == null) { canonicalBaseName = basename; } @@ -1203,7 +1223,6 @@ public class MiNiFiPersistentProvenanceRepository implements ProvenanceEventRepo * @throws IOException if a problem occurs writing to the mergedFile, reading from a journal */ File mergeJournals(final List<File> journalFiles, final File suggestedMergeFile, final EventReporter eventReporter) throws IOException { - logger.warn("Merging {} to {}", journalFiles, suggestedMergeFile); if (this.closed.get()) { logger.info("Provenance Repository has been closed; will not merge journal files to {}", suggestedMergeFile); return null; @@ -1217,8 +1236,8 @@ public class MiNiFiPersistentProvenanceRepository implements ProvenanceEventRepo Collections.sort(journalFiles, new Comparator<File>() { @Override public int compare(final File o1, final File o2) { - final String suffix1 = LuceneUtil.substringAfterLast(o1.getName(), "."); - final String suffix2 = LuceneUtil.substringAfterLast(o2.getName(), "."); + final String suffix1 = StringUtils.substringAfterLast(o1.getName(), "."); + final String suffix2 = StringUtils.substringAfterLast(o2.getName(), "."); try { final int journalIndex1 = Integer.parseInt(suffix1); @@ -1231,7 +1250,7 @@ public class MiNiFiPersistentProvenanceRepository implements ProvenanceEventRepo }); final String firstJournalFile = journalFiles.get(0).getName(); - final String firstFileSuffix = LuceneUtil.substringAfterLast(firstJournalFile, "."); + final String firstFileSuffix = StringUtils.substringAfterLast(firstJournalFile, "."); final boolean allPartialFiles = firstFileSuffix.equals("0"); // check if we have all of the "partial" files for the journal. @@ -1240,26 +1259,26 @@ public class MiNiFiPersistentProvenanceRepository implements ProvenanceEventRepo // we have all "partial" files and there is already a merged file. Delete the data from the index // because the merge file may not be fully merged. We will re-merge. logger.warn("Merged Journal File {} already exists; however, all partial journal files also exist " - + "so assuming that the merge did not finish. Repeating procedure in order to ensure consistency."); + + "so assuming that the merge did not finish. Repeating procedure in order to ensure consistency."); // Since we only store the file's basename, block offset, and event ID, and because the newly created file could end up on // a different Storage Directory than the original, we need to ensure that we delete both the partially merged // file and the TOC file. Otherwise, we could get the wrong copy and have issues retrieving events. if (!suggestedMergeFile.delete()) { logger.error("Failed to delete partially written Provenance Journal File {}. This may result in events from this journal " - + "file not being able to be displayed. This file should be deleted manually.", suggestedMergeFile); + + "file not being able to be displayed. This file should be deleted manually.", suggestedMergeFile); } final File tocFile = TocUtil.getTocFile(suggestedMergeFile); if (tocFile.exists() && !tocFile.delete()) { logger.error("Failed to delete .toc file {}; this may result in not being able to read the Provenance Events from the {} Journal File. " - + "This can be corrected by manually deleting the {} file", tocFile, suggestedMergeFile, tocFile); + + "This can be corrected by manually deleting the {} file", tocFile, suggestedMergeFile, tocFile); } } } else { logger.warn("Cannot merge journal files {} because expected first file to end with extension '.0' " - + "but it did not; assuming that the files were already merged but only some finished deletion " - + "before restart. Deleting remaining partial journal files.", journalFiles); + + "but it did not; assuming that the files were already merged but only some finished deletion " + + "before restart. Deleting remaining partial journal files.", journalFiles); for (final File file : journalFiles) { if (!file.delete() && file.exists()) { @@ -1320,14 +1339,14 @@ public class MiNiFiPersistentProvenanceRepository implements ProvenanceEventRepo } catch (final EOFException eof) { } catch (final Exception e) { logger.warn("Failed to generate Provenance Event Record from Journal due to " + e + "; it's possible that the record wasn't " - + "completely written to the file. This record will be skipped."); + + "completely written to the file. This record will be skipped."); if (logger.isDebugEnabled()) { logger.warn("", e); } if (eventReporter != null) { eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to read Provenance Event Record from Journal due to " + e + - "; it's possible that hte record wasn't completely written to the file. This record will be skipped."); + "; it's possible that hte record wasn't completely written to the file. This record will be skipped."); } } @@ -1393,7 +1412,7 @@ public class MiNiFiPersistentProvenanceRepository implements ProvenanceEventRepo if (eventReporter != null) { eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to remove temporary journal file " + - journalFile.getAbsolutePath() + "; this file should be cleaned up manually"); + journalFile.getAbsolutePath() + "; this file should be cleaned up manually"); } } @@ -1403,7 +1422,7 @@ public class MiNiFiPersistentProvenanceRepository implements ProvenanceEventRepo if (eventReporter != null) { eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to remove temporary journal TOC file " + - tocFile.getAbsolutePath() + "; this file should be cleaned up manually"); + tocFile.getAbsolutePath() + "; this file should be cleaned up manually"); } } } @@ -1598,12 +1617,12 @@ public class MiNiFiPersistentProvenanceRepository implements ProvenanceEventRepo } @Override - public QuerySubmission submitQuery(Query query) { + public QuerySubmission submitQuery(Query query, NiFiUser niFiUser) { throw new MethodNotSupportedException("Querying and indexing is not available for implementation " + this.getClass().getName()); } @Override - public QuerySubmission retrieveQuerySubmission(String queryIdentifier) { + public QuerySubmission retrieveQuerySubmission(String queryIdentifier, NiFiUser niFiUser) { throw new MethodNotSupportedException("Querying and indexing is not available for implementation " + this.getClass().getName()); } @@ -1618,23 +1637,37 @@ public class MiNiFiPersistentProvenanceRepository implements ProvenanceEventRepo } @Override - public AsyncLineageSubmission submitLineageComputation(final String flowFileUuid) { + public AsyncLineageSubmission submitLineageComputation(final String flowFileUuid, NiFiUser niFiUser) { + throw new MethodNotSupportedException("Computation of lineage is not available for implementation " + this.getClass().getName()); + } + + @Override + public ComputeLineageSubmission submitLineageComputation(long eventId, NiFiUser user) { throw new MethodNotSupportedException("Computation of lineage is not available for implementation " + this.getClass().getName()); } @Override - public AsyncLineageSubmission submitExpandChildren(final long eventId) { + public AsyncLineageSubmission submitExpandChildren(final long eventId, NiFiUser niFiUser) { throw new MethodNotSupportedException("Computation of lineage is not available for implementation " + this.getClass().getName()); } @Override - public AsyncLineageSubmission submitExpandParents(final long eventId) { + public AsyncLineageSubmission submitExpandParents(final long eventId, NiFiUser niFiUser) { throw new MethodNotSupportedException("Computation of lineage is not available for implementation " + this.getClass().getName()); } @Override - public AsyncLineageSubmission retrieveLineageSubmission(final String lineageIdentifier) { + public AsyncLineageSubmission retrieveLineageSubmission(final String lineageIdentifier, NiFiUser niFiUser) { throw new MethodNotSupportedException("Computation of lineage is not available for implementation " + this.getClass().getName()); } + @Override + public ProvenanceEventRecord getEvent(final long id, final NiFiUser user) throws IOException { + throw new MethodNotSupportedException("Cannot handle user authorization requests."); + } + + @Override + public ProvenanceEventRepository getProvenanceEventRepository() { + return this; + } } http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/52888507/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/main/resources/META-INF/services/org.apache.nifi.provenance.ProvenanceRepository ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/main/resources/META-INF/services/org.apache.nifi.provenance.ProvenanceRepository b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/main/resources/META-INF/services/org.apache.nifi.provenance.ProvenanceRepository new file mode 100644 index 0000000..71f8c06 --- /dev/null +++ b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/main/resources/META-INF/services/org.apache.nifi.provenance.ProvenanceRepository @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.provenance.MiNiFiPersistentProvenanceRepository \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/52888507/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestMiNiFiPersistentProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestMiNiFiPersistentProvenanceRepository.java b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestMiNiFiPersistentProvenanceRepository.java index 118a2a6..fbeeeb6 100644 --- a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestMiNiFiPersistentProvenanceRepository.java +++ b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestMiNiFiPersistentProvenanceRepository.java @@ -148,7 +148,7 @@ public class TestMiNiFiPersistentProvenanceRepository { config.setMaxEventFileCapacity(1L); config.setMaxEventFileLife(1, TimeUnit.SECONDS); repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter()); + repo.initialize(getEventReporter(), null, null); final Map<String, String> attributes = new HashMap<>(); attributes.put("abc", "xyz"); @@ -176,7 +176,7 @@ public class TestMiNiFiPersistentProvenanceRepository { Thread.sleep(500L); // Give the repo time to shutdown (i.e., close all file handles, etc.) repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter()); + repo.initialize(getEventReporter(), null, null); final List<ProvenanceEventRecord> recoveredRecords = repo.getEvents(0L, 12); Assert.assertEquals("Did not establish the correct, Max Event Id through recovery after reloading", 9, repo.getMaxEventId().intValue()); @@ -198,7 +198,7 @@ public class TestMiNiFiPersistentProvenanceRepository { config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS); config.setCompressOnRollover(true); repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter()); + repo.initialize(getEventReporter(), null, null); final String uuid = "00000000-0000-0000-0000-000000000000"; final Map<String, String> attributes = new HashMap<>(); @@ -236,7 +236,7 @@ public class TestMiNiFiPersistentProvenanceRepository { config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields())); repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter()); + repo.initialize(getEventReporter(), null, null); final String uuid = "00000000-0000-0000-0000-000000000001"; final Map<String, String> attributes = new HashMap<>(); @@ -244,7 +244,7 @@ public class TestMiNiFiPersistentProvenanceRepository { attributes.put("uuid", uuid); attributes.put("filename", "file-" + uuid); - repo.submitLineageComputation(uuid); + repo.submitLineageComputation(uuid, null); } @@ -253,7 +253,7 @@ public class TestMiNiFiPersistentProvenanceRepository { final RepositoryConfiguration config = createConfiguration(); config.setMaxEventFileLife(1, TimeUnit.SECONDS); repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter()); + repo.initialize(getEventReporter(), null, null); final String uuid = "00000000-0000-0000-0000-000000000000"; final Map<String, String> attributes = new HashMap<>(); @@ -279,7 +279,7 @@ public class TestMiNiFiPersistentProvenanceRepository { repo.close(); final MiNiFiPersistentProvenanceRepository secondRepo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - secondRepo.initialize(getEventReporter()); + secondRepo.initialize(getEventReporter(), null, null); try { final ProvenanceEventRecord event11 = builder.build(); @@ -336,7 +336,7 @@ public class TestMiNiFiPersistentProvenanceRepository { config.setDesiredIndexSize(10); repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter()); + repo.initialize(getEventReporter(), null, null); String uuid = UUID.randomUUID().toString(); for (int i = 0; i < 20; i++) { @@ -369,7 +369,7 @@ public class TestMiNiFiPersistentProvenanceRepository { return journalCountRef.get(); } }; - repo.initialize(getEventReporter()); + repo.initialize(getEventReporter(), null, null); final Map<String, String> attributes = new HashMap<>(); final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); @@ -422,7 +422,7 @@ public class TestMiNiFiPersistentProvenanceRepository { final RepositoryConfiguration config = createConfiguration(); config.setMaxEventFileLife(3, TimeUnit.SECONDS); repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter()); + repo.initialize(getEventReporter(), null, null); final Map<String, String> attributes = new HashMap<>(); @@ -473,7 +473,7 @@ public class TestMiNiFiPersistentProvenanceRepository { config.setMaxAttributeChars(50); config.setMaxEventFileLife(3, TimeUnit.SECONDS); repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS); - repo.initialize(getEventReporter()); + repo.initialize(getEventReporter(), null, null); final Map<String, String> attributes = new HashMap<>(); attributes.put("75chars", "123456789012345678901234567890123456789012345678901234567890123456789012345"); @@ -520,7 +520,7 @@ public class TestMiNiFiPersistentProvenanceRepository { }; // initialize with our event reporter - repo.initialize(getEventReporter()); + repo.initialize(getEventReporter(), null, null); // create some events in the journal files. final Map<String, String> attributes = new HashMap<>(); @@ -597,7 +597,7 @@ public class TestMiNiFiPersistentProvenanceRepository { return spiedWriters; } }; - repo.initialize(getEventReporter()); + repo.initialize(getEventReporter(), null, null); final Map<String, String> attributes = new HashMap<>(); attributes.put("75chars", "123456789012345678901234567890123456789012345678901234567890123456789012345"); @@ -658,7 +658,7 @@ public class TestMiNiFiPersistentProvenanceRepository { }; try { - recoveryRepo.initialize(getEventReporter()); + recoveryRepo.initialize(getEventReporter(), null, null); } finally { recoveryRepo.close(); } http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/52888507/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java index 26766d6..935d439 100644 --- a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java +++ b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java @@ -17,9 +17,7 @@ package org.apache.nifi.provenance; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.Set; import org.apache.nifi.flowfile.FlowFile; @@ -39,13 +37,13 @@ public class TestUtil { } @Override - public Set<String> getLineageIdentifiers() { - return new HashSet<String>(); + public long getLineageStartDate() { + return System.currentTimeMillis(); } @Override - public long getLineageStartDate() { - return System.currentTimeMillis(); + public long getLineageStartIndex() { + return 0; } @Override @@ -54,6 +52,11 @@ public class TestUtil { } @Override + public long getQueueDateIndex() { + return 0; + } + + @Override public boolean isPenalized() { return false; } http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/52888507/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-provenance-repository-nar/pom.xml ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-provenance-repository-nar/pom.xml b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-provenance-repository-nar/pom.xml index ee397e2..20c0f77 100644 --- a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-provenance-repository-nar/pom.xml +++ b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-provenance-repository-nar/pom.xml @@ -25,14 +25,12 @@ limitations under the License. <groupId>org.apache.nifi.minifi</groupId> <artifactId>minifi-provenance-repository-nar</artifactId> - <version>0.1.0-SNAPSHOT</version> <packaging>nar</packaging> <dependencies> <dependency> <groupId>org.apache.nifi.minifi</groupId> <artifactId>minifi-persistent-provenance-repository</artifactId> - <version>0.1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.nifi</groupId> http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/52888507/minifi-nar-bundles/minifi-ssl-context-service-nar/pom.xml ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-ssl-context-service-nar/pom.xml b/minifi-nar-bundles/minifi-ssl-context-service-nar/pom.xml new file mode 100644 index 0000000..35f27fa --- /dev/null +++ b/minifi-nar-bundles/minifi-ssl-context-service-nar/pom.xml @@ -0,0 +1,61 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi.minifi</groupId> + <artifactId>minifi-nar-bundles</artifactId> + <version>0.1.0-SNAPSHOT</version> + </parent> + <groupId>org.apache.nifi.minifi</groupId> + <artifactId>minifi-ssl-context-service-nar</artifactId> + <packaging>nar</packaging> + <properties> + <maven.javadoc.skip>true</maven.javadoc.skip> + <source.skip>true</source.skip> + </properties> + <dependencies> + <dependency> + <groupId>org.apache.nifi.minifi</groupId> + <artifactId>minifi-standard-services-api-nar</artifactId> + <type>nar</type> + <exclusions> + <exclusion> + <groupId>org.bouncycastle</groupId> + <artifactId>bcprov-jdk15on</artifactId> + </exclusion> + <exclusion> + <groupId>org.bouncycastle</groupId> + <artifactId>bcpg-jdk15on</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-ssl-context-service</artifactId> + <exclusions> + <exclusion> + <groupId>org.bouncycastle</groupId> + <artifactId>bcprov-jdk15on</artifactId> + </exclusion> + <exclusion> + <groupId>org.bouncycastle</groupId> + <artifactId>bcpg-jdk15on</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> +</project>
