[
https://issues.apache.org/jira/browse/MINIFI-13?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15237818#comment-15237818
]
ASF GitHub Bot commented on MINIFI-13:
--------------------------------------
Github user JPercivall commented on a diff in the pull request:
https://github.com/apache/nifi-minifi/pull/6#discussion_r59438406
--- Diff:
minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/java/org/apache/nifi/minifi/provenance/reporting/ProvenanceReportingTask.java
---
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.provenance.reporting;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import javax.json.Json;
+import javax.json.JsonArray;
+import javax.json.JsonArrayBuilder;
+import javax.json.JsonBuilderFactory;
+import javax.json.JsonObject;
+import javax.json.JsonObjectBuilder;
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.PortStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.reporting.AbstractReportingTask;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.ssl.SSLContextService.ClientAuth;
+
+@Tags({"provenance", "lineage", "tracking", "site", "site to site"})
+@CapabilityDescription("Publishes Provenance events using the Site To Site
protocol.")
+@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's
last event Id so that on restart of NiFi the task knows where it left off.")
+public class ProvenanceReportingTask extends AbstractReportingTask {
+ private static final String TIMESTAMP_FORMAT =
"yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
+ private static final String LAST_EVENT_ID_KEY = "last_event_id";
+
+ static final PropertyDescriptor DESTINATION_URL = new
PropertyDescriptor.Builder()
+ .name("Destination URL")
+ .description("The URL to post the Provenance Events to.")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.URL_VALIDATOR)
+ .build();
+ static final PropertyDescriptor PORT_NAME = new
PropertyDescriptor.Builder()
+ .name("Input Port Name")
+ .description("The name of the Input Port to delivery Provenance
Events to.")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ static final PropertyDescriptor SSL_CONTEXT = new
PropertyDescriptor.Builder()
+ .name("SSL Context Service")
+ .description("The SSL Context Service to use when communicating
with the destination. If not specified, communications will not be secure.")
+ .required(false)
+ .identifiesControllerService(SSLContextService.class)
+ .build();
+ static final PropertyDescriptor MINIFI_URL = new
PropertyDescriptor.Builder()
+ .name("MiNiFi URL")
+ .description("The URL of this MiNiFi instance. This is used to
include the Content URI to send to the destination.")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .defaultValue("http://${hostname(true)}:8080/nifi")
+ .addValidator(new NiFiUrlValidator())
+ .build();
+ static final PropertyDescriptor COMPRESS = new
PropertyDescriptor.Builder()
+ .name("Compress Events")
+ .description("Indicates whether or not to compress the events when
being sent.")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .build();
+ static final PropertyDescriptor TIMEOUT = new
PropertyDescriptor.Builder()
+ .name("Communications Timeout")
+ .description("Specifies how long to wait to a response from the
destination before deciding that an error has occurred and canceling the
transaction")
+ .required(true)
+ .defaultValue("30 secs")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+ static final PropertyDescriptor BATCH_SIZE = new
PropertyDescriptor.Builder()
+ .name("Batch Size")
+ .description("Specifies how many records to send in a single
batch, at most.")
+ .required(true)
+ .defaultValue("1000")
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+
+ private volatile long firstEventId = -1L;
+ private volatile SiteToSiteClient siteToSiteClient;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(DESTINATION_URL);
+ properties.add(PORT_NAME);
+ properties.add(SSL_CONTEXT);
+ properties.add(MINIFI_URL);
+ properties.add(COMPRESS);
+ properties.add(TIMEOUT);
+ properties.add(BATCH_SIZE);
+ return properties;
+ }
+
+ @OnScheduled
+ public void setup(final ConfigurationContext context) throws
IOException {
+ final SSLContextService sslContextService =
context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
+ final SSLContext sslContext = sslContextService == null ? null :
sslContextService.createSSLContext(ClientAuth.REQUIRED);
+ final EventReporter eventReporter = new EventReporter() {
+ @Override
+ public void reportEvent(final Severity severity, final String
category, final String message) {
+ switch (severity) {
+ case WARNING:
+ getLogger().warn(message);
+ break;
+ case ERROR:
+ getLogger().error(message);
+ break;
+ default:
+ break;
+ }
+ }
+ };
+
+ final String destinationUrlPrefix =
context.getProperty(DESTINATION_URL).evaluateAttributeExpressions().getValue();
+ final String destinationUrl = destinationUrlPrefix +
(destinationUrlPrefix.endsWith("/") ? "nifi" : "/nifi");
+
+ siteToSiteClient = new SiteToSiteClient.Builder()
+ .url(destinationUrl)
+ .portName(context.getProperty(PORT_NAME).getValue())
+ .useCompression(context.getProperty(COMPRESS).asBoolean())
+ .eventReporter(eventReporter)
+ .sslContext(sslContext)
+
.timeout(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS)
+ .build();
+ }
+
+ @OnStopped
+ public void shutdown() throws IOException {
+ final SiteToSiteClient client = getClient();
+ if (client != null) {
+ client.close();
+ }
+ }
+
+ // this getter is intended explicitly for testing purposes
+ protected SiteToSiteClient getClient() {
+ return this.siteToSiteClient;
+ }
+
+ private String getComponentName(final ProcessGroupStatus status, final
ProvenanceEventRecord event) {
+ if (status == null) {
+ return null;
+ }
+
+ final String componentId = event.getComponentId();
+ if (status.getId().equals(componentId)) {
+ return status.getName();
+ }
+
+ for (final ProcessorStatus procStatus :
status.getProcessorStatus()) {
+ if (procStatus.getId().equals(componentId)) {
+ return procStatus.getName();
+ }
+ }
+
+ for (final PortStatus portStatus : status.getInputPortStatus()) {
+ if (portStatus.getId().equals(componentId)) {
+ return portStatus.getName();
+ }
+ }
+
+ for (final PortStatus portStatus : status.getOutputPortStatus()) {
+ if (portStatus.getId().equals(componentId)) {
+ return portStatus.getName();
+ }
+ }
+
+ for (final RemoteProcessGroupStatus rpgStatus :
status.getRemoteProcessGroupStatus()) {
+ if (rpgStatus.getId().equals(componentId)) {
+ return rpgStatus.getName();
+ }
+ }
+
+ for (final ProcessGroupStatus childGroup :
status.getProcessGroupStatus()) {
+ final String componentName = getComponentName(childGroup,
event);
+ if (componentName != null) {
+ return componentName;
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ public void onTrigger(final ReportingContext context) {
+ final ProcessGroupStatus procGroupStatus =
context.getEventAccess().getControllerStatus();
+ final String rootGroupName = procGroupStatus == null ? null :
procGroupStatus.getName();
+
+ if (firstEventId < 0) {
+ Map<String, String> state;
+ try {
+ state =
context.getStateManager().getState(Scope.LOCAL).toMap();
+ } catch (IOException e) {
+ getLogger().error("Failed to get state at start up due to
{}", e);
+ return;
+ }
+ if (state.containsKey(LAST_EVENT_ID_KEY)) {
--- End diff --
Ah good catch, I think the best we can do is to check whether the current
max id in provenance is less than the last id queried (stored in state).
> Create a Reporting Task to Send Provenance data
> -----------------------------------------------
>
> Key: MINIFI-13
> URL: https://issues.apache.org/jira/browse/MINIFI-13
> Project: Apache NiFi MiNiFi
> Issue Type: Sub-task
> Components: Data Format, Data Transmission
> Reporter: Joseph Percivall
> Assignee: Joseph Percivall
> Fix For: 0.0.1
>
>
> With initial effort to re-use as much of NiFi as possible it is not possible
> to easily create a ProvenanceReporter to add provenance events as attributes
> to FlowFiles as it would require changing the ProvenancenReporter interface.
> This will require utilizing a different extension point to transmit the
> provenance data back to a core NiFi instance.
> Probably the most efficient way to do this is to create a ReportingTask which
> reports the provenance events using the S2S protocol.
> In the future this will probably be retired as a reporting task as MiNiFi
> grows to rely less on NiFi but this Reporting task could also be contributed
> back to NiFi.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)