Repository: nifi
Updated Branches:
  refs/heads/master dd50745a9 -> 2b435cdfc


NIFI-3985: This closes #1864. Added 'Starting Position' property to 
SiteToSiteReportingTask; also added additionalDetails.html that explains the 
schema and updated the reporting task to stop publishing when the user clicks 
'stops' instead of running indefinitely until the reporting task has caught up

Signed-off-by: joewitt <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2b435cdf
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2b435cdf
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2b435cdf

Branch: refs/heads/master
Commit: 2b435cdfc6fd0824d9eb5f2cf140a330c9f258ed
Parents: dd50745
Author: Mark Payne <[email protected]>
Authored: Fri May 26 10:59:47 2017 -0400
Committer: joewitt <[email protected]>
Committed: Fri May 26 21:05:45 2017 -0500

----------------------------------------------------------------------
 .../SiteToSiteProvenanceReportingTask.java      | 54 ++++++++++++--
 .../additionalDetails.html                      | 77 ++++++++++++++++++++
 .../TestSiteToSiteProvenanceReportingTask.java  | 23 +++---
 3 files changed, 135 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/2b435cdf/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
index cae4d17..37b5070 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
@@ -23,6 +23,8 @@ import org.apache.nifi.annotation.behavior.Stateful;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateManager;
@@ -71,6 +73,11 @@ public class SiteToSiteProvenanceReportingTask extends 
AbstractSiteToSiteReporti
     static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
     static final String LAST_EVENT_ID_KEY = "last_event_id";
 
+    static final AllowableValue BEGINNING_OF_STREAM = new 
AllowableValue("beginning-of-stream", "Beginning of Stream",
+        "Start reading provenance Events from the beginning of the stream (the 
oldest event first)");
+    static final AllowableValue END_OF_STREAM = new 
AllowableValue("end-of-stream", "End of Stream",
+        "Start reading provenance Events from the end of the stream, ignoring 
old events");
+
     static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
         .name("Platform")
         .displayName("Platform")
@@ -83,7 +90,7 @@ public class SiteToSiteProvenanceReportingTask extends 
AbstractSiteToSiteReporti
 
     static final PropertyDescriptor FILTER_EVENT_TYPE = new 
PropertyDescriptor.Builder()
         .name("s2s-prov-task-event-filter")
-        .displayName("Event type")
+        .displayName("Event Type")
         .description("Comma-separated list of event types that will be used to 
filter the provenance events sent by the reporting task. "
                 + "Available event types are " + ProvenanceEventType.values() 
+ ". If no filter is set, all the events are sent. If "
                         + "multiple filters are set, the filters are 
cumulative.")
@@ -93,7 +100,7 @@ public class SiteToSiteProvenanceReportingTask extends 
AbstractSiteToSiteReporti
 
     static final PropertyDescriptor FILTER_COMPONENT_TYPE = new 
PropertyDescriptor.Builder()
         .name("s2s-prov-task-type-filter")
-        .displayName("Component type")
+        .displayName("Component Type")
         .description("Regular expression to filter the provenance events based 
on the component type. Only the events matching the regular "
                 + "expression will be sent. If no filter is set, all the 
events are sent. If multiple filters are set, the filters are cumulative.")
         .required(false)
@@ -109,11 +116,21 @@ public class SiteToSiteProvenanceReportingTask extends 
AbstractSiteToSiteReporti
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .build();
 
+    static final PropertyDescriptor START_POSITION = new 
PropertyDescriptor.Builder()
+        .name("start-position")
+        .displayName("Start Position")
+        .description("If the Reporting Task has never been run, or if its 
state has been reset by a user, specifies where in the stream of Provenance 
Events the Reporting Task should start")
+        .allowableValues(BEGINNING_OF_STREAM, END_OF_STREAM)
+        .defaultValue(BEGINNING_OF_STREAM.getValue())
+        .required(true)
+        .build();
+
     private volatile long firstEventId = -1L;
     private volatile boolean isFilteringEnabled = false;
     private volatile Pattern componentTypeRegex;
     private volatile List<ProvenanceEventType> eventTypes = new 
ArrayList<ProvenanceEventType>();
     private volatile List<String> componentIds = new ArrayList<String>();
+    private volatile boolean scheduled = false;
 
     @OnScheduled
     public void onScheduled(final ConfigurationContext context) throws 
IOException {
@@ -139,6 +156,17 @@ public class SiteToSiteProvenanceReportingTask extends 
AbstractSiteToSiteReporti
 
         // set a boolean whether filtering will be applied or not
         isFilteringEnabled = componentTypeRegex != null || 
!eventTypes.isEmpty() || !componentIds.isEmpty();
+
+        scheduled = true;
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled() {
+        scheduled = false;
+    }
+
+    public boolean isScheduled() {
+        return scheduled;
     }
 
     @Override
@@ -148,6 +176,7 @@ public class SiteToSiteProvenanceReportingTask extends 
AbstractSiteToSiteReporti
         properties.add(FILTER_EVENT_TYPE);
         properties.add(FILTER_COMPONENT_TYPE);
         properties.add(FILTER_COMPONENT_ID);
+        properties.add(START_POSITION);
         return properties;
     }
 
@@ -210,14 +239,27 @@ public class SiteToSiteProvenanceReportingTask extends 
AbstractSiteToSiteReporti
                 getLogger().error("Failed to get state at start up due to:" + 
e.getMessage(), e);
                 return;
             }
+
+            final String startPositionValue = 
context.getProperty(START_POSITION).getValue();
+
             if (state.containsKey(LAST_EVENT_ID_KEY)) {
                 firstEventId = Long.parseLong(state.get(LAST_EVENT_ID_KEY)) + 
1;
+            } else {
+                if (END_OF_STREAM.getValue().equals(startPositionValue)) {
+                    firstEventId = currMaxId;
+                }
             }
 
-            if(currMaxId < (firstEventId - 1)){
-                getLogger().warn("Current provenance max id is {} which is 
less than what was stored in state as the last queried event, which was {}. 
This means the provenance restarted its " +
+            if (currMaxId < (firstEventId - 1)) {
+                if (BEGINNING_OF_STREAM.getValue().equals(startPositionValue)) 
{
+                    getLogger().warn("Current provenance max id is {} which is 
less than what was stored in state as the last queried event, which was {}. 
This means the provenance restarted its " +
                         "ids. Restarting querying from the beginning.", new 
Object[]{currMaxId, firstEventId});
-                firstEventId = -1;
+                    firstEventId = -1;
+                } else {
+                    getLogger().warn("Current provenance max id is {} which is 
less than what was stored in state as the last queried event, which was {}. 
This means the provenance restarted its " +
+                        "ids. Restarting querying from the latest event in the 
Provenance Repository.", new Object[] {currMaxId, firstEventId});
+                    firstEventId = currMaxId;
+                }
             }
         }
 
@@ -258,7 +300,7 @@ public class SiteToSiteProvenanceReportingTask extends 
AbstractSiteToSiteReporti
         final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT);
         df.setTimeZone(TimeZone.getTimeZone("Z"));
 
-        while (events != null && !events.isEmpty()) {
+        while (events != null && !events.isEmpty() && isScheduled()) {
             final long start = System.nanoTime();
 
             // Create a JSON array of all the events in the current batch

http://git-wip-us.apache.org/repos/asf/nifi/blob/2b435cdf/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html
new file mode 100644
index 0000000..7e8204c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html
@@ -0,0 +1,77 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8" />
+        <title>SiteToSiteProvenanceReportingTask</title>
+
+        <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" 
type="text/css" />
+    </head>
+
+    <body>
+       <p>
+               The Site-to-Site Provenance Reporting Task allows the user to 
publish all of the Provenance Events from a NiFi instance back to
+               the same NiFi instance or another NiFi instance. This provides 
a great deal of power because it allows the user to make use of
+               all of the different Processors that are available in NiFi in 
order to processor or distribute that data. When possible, it is
+               advisable to send the Provenance data to a different NiFi 
instance than the one that this Reporting Task is running on, because
+               when the data is received over Site-to-Site and processed, that 
in and of itself will generate Provenance events. As a result, there
+               is a cycle that is created. However, the data is sent in 
batches (1,000 by default). This means that for each batch of Provenance events
+               that are sent back to NiFi, the receiving NiFi will have to 
generate only a single event per component.
+       </p>
+       
+       <p>
+               When published to a NiFi instance, the Provenance data is sent 
as a JSON array. Quite often, it can be useful to work with this data using
+               a schema. As such, the schema for this Provenance data can be 
defined as follows:
+       </p>
+
+<pre>
+<code>
+{
+  "namespace": "nifi",
+  "name": "provenanceEvent",
+  "type": "record",
+  "fields": [
+    { "name": "eventId", "type": "string" },
+    { "name": "eventOrdinal", "type": "long" },
+    { "name": "eventType", "type": "string" },
+    { "name": "timestampMillis", "type": "long" },
+    { "name": "durationMillis", "type": "long" },
+    { "name": "lineageStart", "type": { "type": "long", "logicalType": 
"timestamp-millis" } },
+    { "name": "details", "type": "string" },
+    { "name": "componentId", "type": "string" },
+    { "name": "componentType", "type": "string" },
+    { "name": "entityId", "type": "string" },
+    { "name": "entityType", "type": "string" },
+    { "name": "entitySize", "type": ["null", "long"] },
+    { "name": "previousEntitySize", "type": ["null", "long"] },
+    { "name": "updatedAttributes", "type": { "type": "map", "values": "string" 
} },
+    { "name": "previousAttributes", "type": { "type": "map", "values": 
"string" } },
+    { "name": "actorHostname", "type": "string" },
+    { "name": "contentURI", "type": "string" },
+    { "name": "previousContentURI", "type": "string" },
+    { "name": "parentIds", "type": { "type": "array", "items": "string" } },
+    { "name": "childIds", "type": { "type": "array", "items": "string" } },
+    { "name": "platform", "type": "string" },
+    { "name": "application", "type": "string" },
+    { "name": "transitUri", "type": ["null", "string"] }
+  ]
+}
+</code>
+</pre>
+
+       
+       </body>
+</html>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/2b435cdf/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
index a396ac8..86cbb74 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
@@ -95,7 +95,10 @@ public class TestSiteToSiteProvenanceReportingTask {
 
                 final List<ProvenanceEventRecord> eventsToReturn = new 
ArrayList<>();
                 for (int i = (int) Math.max(0, startId); i < (int) (startId + 
maxRecords) && totalEvents.get() < maxEventId; i++) {
-                    eventsToReturn.add(event);
+                    if (event != null) {
+                        eventsToReturn.add(event);
+                    }
+
                     totalEvents.getAndIncrement();
                 }
                 return eventsToReturn;
@@ -304,7 +307,12 @@ public class TestSiteToSiteProvenanceReportingTask {
         final long maxEventId = 2500;
 
         // create the mock reporting task and mock state manager
-        final MockSiteToSiteProvenanceReportingTask task = new 
MockSiteToSiteProvenanceReportingTask();
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        for (final PropertyDescriptor descriptor : new 
MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
+            properties.put(descriptor, descriptor.getDefaultValue());
+        }
+
+        final MockSiteToSiteProvenanceReportingTask task = setup(null, 
properties);
         final MockStateManager stateManager = new MockStateManager(task);
 
         // create the state map and set the last id to the same value as 
maxEventId
@@ -312,10 +320,6 @@ public class TestSiteToSiteProvenanceReportingTask {
         state.put(SiteToSiteProvenanceReportingTask.LAST_EVENT_ID_KEY, 
String.valueOf(maxEventId));
         stateManager.setState(state, Scope.LOCAL);
 
-        // setup the mock reporting context to return the mock state manager
-        final ReportingContext context = Mockito.mock(ReportingContext.class);
-        Mockito.when(context.getStateManager()).thenReturn(stateManager);
-
         // setup the mock provenance repository to return maxEventId
         final ProvenanceEventRepository provenanceRepository = 
Mockito.mock(ProvenanceEventRepository.class);
         Mockito.doAnswer(new Answer<Long>() {
@@ -327,15 +331,8 @@ public class TestSiteToSiteProvenanceReportingTask {
 
         // setup the mock EventAccess to return the mock provenance repository
         final EventAccess eventAccess = Mockito.mock(EventAccess.class);
-        Mockito.when(context.getEventAccess()).thenReturn(eventAccess);
         
Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository);
 
-        // setup the mock initialization context
-        final ComponentLog logger = Mockito.mock(ComponentLog.class);
-        final ReportingInitializationContext initContext = 
Mockito.mock(ReportingInitializationContext.class);
-        
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
-        Mockito.when(initContext.getLogger()).thenReturn(logger);
-
         task.initialize(initContext);
 
         // execute the reporting task and should not produce any data b/c max 
id same as previous id

Reply via email to