Repository: nifi
Updated Branches:
  refs/heads/master 6201c06c9 -> acf05e063


NIFI-4468: If an entire batch of Provenance Events are read by the Site-to-Site 
Provenance Reporting Task and none of them match the filters, then the 
reporting did not update its state, so it would be stuck in this cycle 
indefinitely. Made fix so that if any event is read from the provenance 
repository, regardless of whether or not it matches the filters, we update the 
state to keep track of what has been processed

This closes #2198.

Signed-off-by: Joe Skora <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/acf05e06
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/acf05e06
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/acf05e06

Branch: refs/heads/master
Commit: acf05e0636f6a766d3cfa3d76c10ab6dbbb78c93
Parents: 6201c06
Author: Mark Payne <[email protected]>
Authored: Fri Oct 6 14:53:04 2017 -0400
Committer: Joe Skora <[email protected]>
Committed: Sun Oct 8 15:34:48 2017 -0400

----------------------------------------------------------------------
 .../SiteToSiteProvenanceReportingTask.java      | 113 +++++++++++--------
 1 file changed, 64 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/acf05e06/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
index e998deb..8af9412 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
@@ -272,15 +272,17 @@ public class SiteToSiteProvenanceReportingTask extends 
AbstractSiteToSiteReporti
             return;
         }
 
-        List<ProvenanceEventRecord> events;
+        List<ProvenanceEventRecord> rawEvents;
+        List<ProvenanceEventRecord> filteredEvents;
         try {
-            events = 
filterEvents(context.getEventAccess().getProvenanceEvents(firstEventId, 
context.getProperty(BATCH_SIZE).asInteger()));
+            rawEvents = 
context.getEventAccess().getProvenanceEvents(firstEventId, 
context.getProperty(BATCH_SIZE).asInteger());
+            filteredEvents = filterEvents(rawEvents);
         } catch (final IOException ioe) {
             getLogger().error("Failed to retrieve Provenance Events from 
repository due to: " + ioe.getMessage(), ioe);
             return;
         }
 
-        if (events == null || events.isEmpty()) {
+        if (rawEvents == null || rawEvents.isEmpty()) {
             getLogger().debug("No events to send due to 'events' being null or 
empty.");
             return;
         }
@@ -304,69 +306,82 @@ public class SiteToSiteProvenanceReportingTask extends 
AbstractSiteToSiteReporti
         final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT);
         df.setTimeZone(TimeZone.getTimeZone("Z"));
 
-        while (events != null && !events.isEmpty() && isScheduled()) {
+        while (rawEvents != null && !rawEvents.isEmpty() && isScheduled()) {
             final long start = System.nanoTime();
 
-            // Create a JSON array of all the events in the current batch
-            final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
-            for (final ProvenanceEventRecord event : events) {
-                final String componentName = 
componentMap.get(event.getComponentId());
-                arrayBuilder.add(serialize(factory, builder, event, df, 
componentName, hostname, url, rootGroupName, platform, nodeId));
-            }
-            final JsonArray jsonArray = arrayBuilder.build();
-
-            // Send the JSON document for the current batch
-            try {
-                final Transaction transaction = 
getClient().createTransaction(TransferDirection.SEND);
-                if (transaction == null) {
-                    getLogger().debug("All destination nodes are penalized; 
will attempt to send data later");
-                    return;
+            if (!filteredEvents.isEmpty()) {
+                // Create a JSON array of all the events in the current batch
+                final JsonArrayBuilder arrayBuilder = 
factory.createArrayBuilder();
+                for (final ProvenanceEventRecord event : filteredEvents) {
+                    final String componentName = 
componentMap.get(event.getComponentId());
+                    arrayBuilder.add(serialize(factory, builder, event, df, 
componentName, hostname, url, rootGroupName, platform, nodeId));
                 }
+                final JsonArray jsonArray = arrayBuilder.build();
 
-                final Map<String, String> attributes = new HashMap<>();
-                final String transactionId = UUID.randomUUID().toString();
-                attributes.put("reporting.task.transaction.id", transactionId);
-                attributes.put("mime.type", "application/json");
-
-                final byte[] data = 
jsonArray.toString().getBytes(StandardCharsets.UTF_8);
-                transaction.send(data, attributes);
-                transaction.confirm();
-                transaction.complete();
-
-                final long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-                getLogger().info("Successfully sent {} Provenance Events to 
destination in {} ms; Transaction ID = {}; First Event ID = {}",
-                        new Object[]{events.size(), transferMillis, 
transactionId, events.get(0).getEventId()});
-            } catch (final IOException e) {
-                throw new ProcessException("Failed to send Provenance Events 
to destination due to IOException:" + e.getMessage(), e);
-            }
-
-            // Store the id of the last event so we know where we left off
-            final ProvenanceEventRecord lastEvent = events.get(events.size() - 
1);
-            final String lastEventId = String.valueOf(lastEvent.getEventId());
-            try {
-                StateManager stateManager = context.getStateManager();
-                Map<String, String> newMapOfState = new HashMap<>();
-                newMapOfState.put(LAST_EVENT_ID_KEY, lastEventId);
-                stateManager.setState(newMapOfState, Scope.LOCAL);
-            } catch (final IOException ioe) {
-                getLogger().error("Failed to update state to {} due to {}; 
this could result in events being re-sent after a restart. The message of {} 
was: {}",
-                        new Object[]{lastEventId, ioe, ioe, ioe.getMessage()}, 
ioe);
+                // Send the JSON document for the current batch
+                try {
+                    final Transaction transaction = 
getClient().createTransaction(TransferDirection.SEND);
+                    if (transaction == null) {
+                        getLogger().debug("All destination nodes are 
penalized; will attempt to send data later");
+                        return;
+                    }
+
+                    final Map<String, String> attributes = new HashMap<>();
+                    final String transactionId = UUID.randomUUID().toString();
+                    attributes.put("reporting.task.transaction.id", 
transactionId);
+                    attributes.put("mime.type", "application/json");
+
+                    final byte[] data = 
jsonArray.toString().getBytes(StandardCharsets.UTF_8);
+                    transaction.send(data, attributes);
+                    transaction.confirm();
+                    transaction.complete();
+
+                    final long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+                    getLogger().info("Successfully sent {} Provenance Events 
to destination in {} ms; Transaction ID = {}; First Event ID = {}",
+                        new Object[] {filteredEvents.size(), transferMillis, 
transactionId, rawEvents.get(0).getEventId()});
+                } catch (final IOException e) {
+                    throw new ProcessException("Failed to send Provenance 
Events to destination due to IOException:" + e.getMessage(), e);
+                }
             }
 
-            firstEventId = lastEvent.getEventId() + 1;
+            firstEventId = updateLastEventId(rawEvents, 
context.getStateManager());
 
             // Retrieve the next batch
             try {
-                events = 
filterEvents(context.getEventAccess().getProvenanceEvents(firstEventId, 
context.getProperty(BATCH_SIZE).asInteger()));
+                rawEvents = 
context.getEventAccess().getProvenanceEvents(firstEventId, 
context.getProperty(BATCH_SIZE).asInteger());
+                filteredEvents = filterEvents(rawEvents);
             } catch (final IOException ioe) {
                 getLogger().error("Failed to retrieve Provenance Events from 
repository due to: " + ioe.getMessage(), ioe);
                 return;
             }
         }
+    }
+
+    private long updateLastEventId(final List<ProvenanceEventRecord> events, 
final StateManager stateManager) {
+        if (events == null || events.isEmpty()) {
+            return firstEventId;
+        }
+
+        // Store the id of the last event so we know where we left off
+        final ProvenanceEventRecord lastEvent = events.get(events.size() - 1);
+        final String lastEventId = String.valueOf(lastEvent.getEventId());
+        try {
+            Map<String, String> newMapOfState = new HashMap<>();
+            newMapOfState.put(LAST_EVENT_ID_KEY, lastEventId);
+            stateManager.setState(newMapOfState, Scope.LOCAL);
+        } catch (final IOException ioe) {
+            getLogger().error("Failed to update state to {} due to {}; this 
could result in events being re-sent after a restart. The message of {} was: 
{}",
+                new Object[] {lastEventId, ioe, ioe, ioe.getMessage()}, ioe);
+        }
 
+        return lastEvent.getEventId() + 1;
     }
 
-    private List<ProvenanceEventRecord> 
filterEvents(List<ProvenanceEventRecord> provenanceEvents) {
+    private List<ProvenanceEventRecord> filterEvents(final 
List<ProvenanceEventRecord> provenanceEvents) {
+        if (provenanceEvents == null || provenanceEvents.isEmpty()) {
+            return Collections.emptyList();
+        }
+
         if(isFilteringEnabled) {
             List<ProvenanceEventRecord> filteredEvents = new 
ArrayList<ProvenanceEventRecord>();
 

Reply via email to