NIFI-10: Added FETCH and DOWNLOAD Provenance Events; updated FlowController to 
use DOWNLOAD event instead of SEND whenever a user downloads/views content via 
Provenance Event


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fc2aa276
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fc2aa276
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fc2aa276

Branch: refs/heads/NIFI-655
Commit: fc2aa2764cc9e85a19d3f3eec640873f43c60148
Parents: 51f5640
Author: Mark Payne <[email protected]>
Authored: Sun Oct 25 11:53:46 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Mon Oct 26 14:58:50 2015 -0400

----------------------------------------------------------------------
 .../nifi/provenance/ProvenanceEventType.java    |   5 +
 .../apache/nifi/controller/FlowController.java  | 130 +++++++++----------
 2 files changed, 69 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/fc2aa276/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventType.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventType.java 
b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventType.java
index 188e8fc..0d844b8 100644
--- a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventType.java
+++ b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventType.java
@@ -47,6 +47,11 @@ public enum ProvenanceEventType {
     SEND,
 
     /**
+     * Indicates that the contents of a FlowFile were downloaded by a user or 
external entity.
+     */
+    DOWNLOAD,
+
+    /**
      * Indicates a provenance event for the conclusion of an object's life for
      * some reason other than object expiration
      */

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc2aa276/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index d9c3f39..3f815b0 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -216,7 +216,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     public static final String SCHEDULE_MINIMUM_NANOSECONDS = 
"flowcontroller.minimum.nanoseconds";
     public static final String GRACEFUL_SHUTDOWN_PERIOD = 
"nifi.flowcontroller.graceful.shutdown.seconds";
     public static final long DEFAULT_GRACEFUL_SHUTDOWN_SECONDS = 10;
-    public static final int METRICS_RESERVOIR_SIZE = 288;   // 1 day worth of 
5-minute captures
+    public static final int METRICS_RESERVOIR_SIZE = 288; // 1 day worth of 
5-minute captures
 
     public static final String ROOT_GROUP_ID_ALIAS = "root";
     public static final String DEFAULT_ROOT_GROUP_NAME = "NiFi Flow";
@@ -245,7 +245,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     private final UserService userService;
     private final EventDrivenWorkerQueue eventDrivenWorkerQueue;
     private final ComponentStatusRepository componentStatusRepository;
-    private final long systemStartTime = System.currentTimeMillis();    // 
time at which the node was started
+    private final long systemStartTime = System.currentTimeMillis(); // time 
at which the node was started
     private final ConcurrentMap<String, ReportingTaskNode> reportingTasks = 
new ConcurrentHashMap<>();
 
     // The Heartbeat Bean is used to provide an Atomic Reference to data that 
is used in heartbeats that may
@@ -336,38 +336,38 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     private final Lock readLock = rwLock.readLock();
     private final Lock writeLock = rwLock.writeLock();
 
-    private FlowFileSwapManager flowFileSwapManager;    // guarded by 
read/write lock
+    private FlowFileSwapManager flowFileSwapManager; // guarded by read/write 
lock
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FlowController.class);
     private static final Logger heartbeatLogger = 
LoggerFactory.getLogger("org.apache.nifi.cluster.heartbeat");
 
     public static FlowController createStandaloneInstance(
-            final FlowFileEventRepository flowFileEventRepo,
-            final NiFiProperties properties,
-            final UserService userService,
-            final StringEncryptor encryptor) {
+        final FlowFileEventRepository flowFileEventRepo,
+        final NiFiProperties properties,
+        final UserService userService,
+        final StringEncryptor encryptor) {
         return new FlowController(
-                flowFileEventRepo,
-                properties,
-                userService,
-                encryptor,
-                /* configuredForClustering */ false,
-                /* NodeProtocolSender */ null);
+            flowFileEventRepo,
+            properties,
+            userService,
+            encryptor,
+            /* configuredForClustering */ false,
+            /* NodeProtocolSender */ null);
     }
 
     public static FlowController createClusteredInstance(
-            final FlowFileEventRepository flowFileEventRepo,
-            final NiFiProperties properties,
-            final UserService userService,
-            final StringEncryptor encryptor,
-            final NodeProtocolSender protocolSender) {
+        final FlowFileEventRepository flowFileEventRepo,
+        final NiFiProperties properties,
+        final UserService userService,
+        final StringEncryptor encryptor,
+        final NodeProtocolSender protocolSender) {
         final FlowController flowController = new FlowController(
-                flowFileEventRepo,
-                properties,
-                userService,
-                encryptor,
-                /* configuredForClustering */ true,
-                /* NodeProtocolSender */ protocolSender);
+            flowFileEventRepo,
+            properties,
+            userService,
+            encryptor,
+            /* configuredForClustering */ true,
+            /* NodeProtocolSender */ protocolSender);
 
         
flowController.setClusterManagerRemoteSiteInfo(properties.getRemoteInputPort(), 
properties.isSiteToSiteSecure());
 
@@ -375,12 +375,12 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     }
 
     private FlowController(
-            final FlowFileEventRepository flowFileEventRepo,
-            final NiFiProperties properties,
-            final UserService userService,
-            final StringEncryptor encryptor,
-            final boolean configuredForClustering,
-            final NodeProtocolSender protocolSender) {
+        final FlowFileEventRepository flowFileEventRepo,
+        final NiFiProperties properties,
+        final UserService userService,
+        final StringEncryptor encryptor,
+        final boolean configuredForClustering,
+        final NodeProtocolSender protocolSender) {
 
         maxTimerDrivenThreads = new AtomicInteger(10);
         maxEventDrivenThreads = new AtomicInteger(5);
@@ -416,7 +416,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
         final ProcessContextFactory contextFactory = new 
ProcessContextFactory(contentRepository, flowFileRepository, 
flowFileEventRepository, counterRepositoryRef.get(), provenanceEventRepository);
         processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, 
new EventDrivenSchedulingAgent(
-                eventDrivenEngineRef.get(), this, eventDrivenWorkerQueue, 
contextFactory, maxEventDrivenThreads.get(), encryptor));
+            eventDrivenEngineRef.get(), this, eventDrivenWorkerQueue, 
contextFactory, maxEventDrivenThreads.get(), encryptor));
 
         final QuartzSchedulingAgent quartzSchedulingAgent = new 
QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, 
encryptor);
         final TimerDrivenSchedulingAgent timerDrivenAgent = new 
TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, 
encryptor);
@@ -468,7 +468,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             externalSiteListener = null;
         } else if (isSiteToSiteSecure && sslContext == null) {
             LOG.error("Unable to create Secure Site-to-Site Listener because 
not all required Keystore/Truststore "
-                    + "Properties are set. Site-to-Site functionality will be 
disabled until this problem is has been fixed.");
+                + "Properties are set. Site-to-Site functionality will be 
disabled until this problem is has been fixed.");
             externalSiteListener = null;
         } else {
             // Register the SocketFlowFileServerProtocol as the appropriate 
resource for site-to-site Server Protocol
@@ -501,7 +501,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         final String implementationClassName = 
properties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION, 
DEFAULT_FLOWFILE_REPO_IMPLEMENTATION);
         if (implementationClassName == null) {
             throw new RuntimeException("Cannot create FlowFile Repository 
because the NiFi Properties is missing the following property: "
-                    + NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION);
+                + NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION);
         }
 
         try {
@@ -612,7 +612,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                             startConnectable(connectable);
                         }
                     } catch (final Throwable t) {
-                        LOG.error("Unable to start {} due to {}", new 
Object[]{connectable, t.toString()});
+                        LOG.error("Unable to start {} due to {}", new Object[] 
{connectable, t.toString()});
                         if (LOG.isDebugEnabled()) {
                             LOG.error("", t);
                         }
@@ -627,7 +627,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                         
remoteGroupPort.getRemoteProcessGroup().startTransmitting(remoteGroupPort);
                         startedTransmitting++;
                     } catch (final Throwable t) {
-                        LOG.error("Unable to start transmitting with {} due to 
{}", new Object[]{remoteGroupPort, t});
+                        LOG.error("Unable to start transmitting with {} due to 
{}", new Object[] {remoteGroupPort, t});
                     }
                 }
 
@@ -642,7 +642,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                             startConnectable(connectable);
                         }
                     } catch (final Throwable t) {
-                        LOG.error("Unable to start {} due to {}", new 
Object[]{connectable, t});
+                        LOG.error("Unable to start {} due to {}", new Object[] 
{connectable, t});
                     }
                 }
 
@@ -658,7 +658,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         final String implementationClassName = 
properties.getProperty(NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION, 
DEFAULT_CONTENT_REPO_IMPLEMENTATION);
         if (implementationClassName == null) {
             throw new RuntimeException("Cannot create Provenance Repository 
because the NiFi Properties is missing the following property: "
-                    + NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION);
+                + NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION);
         }
 
         try {
@@ -676,7 +676,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         final String implementationClassName = 
properties.getProperty(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS, 
DEFAULT_PROVENANCE_REPO_IMPLEMENTATION);
         if (implementationClassName == null) {
             throw new RuntimeException("Cannot create Provenance Repository 
because the NiFi Properties is missing the following property: "
-                    + NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS);
+                + NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS);
         }
 
         try {
@@ -690,7 +690,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         final String implementationClassName = 
properties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION,
 DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION);
         if (implementationClassName == null) {
             throw new RuntimeException("Cannot create Component Status 
Repository because the NiFi Properties is missing the following property: "
-                    + 
NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION);
+                + NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION);
         }
 
         try {
@@ -910,7 +910,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         name = requireNonNull(name).intern();
         verifyPortIdDoesNotExist(id);
         return new StandardRootGroupPort(id, name, null, 
TransferDirection.RECEIVE, ConnectableType.INPUT_PORT,
-                userService, getBulletinRepository(), processScheduler, 
Boolean.TRUE.equals(isSiteToSiteSecure));
+            userService, getBulletinRepository(), processScheduler, 
Boolean.TRUE.equals(isSiteToSiteSecure));
     }
 
     /**
@@ -927,7 +927,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         name = requireNonNull(name).intern();
         verifyPortIdDoesNotExist(id);
         return new StandardRootGroupPort(id, name, null, 
TransferDirection.SEND, ConnectableType.OUTPUT_PORT,
-                userService, getBulletinRepository(), processScheduler, 
Boolean.TRUE.equals(isSiteToSiteSecure));
+            userService, getBulletinRepository(), processScheduler, 
Boolean.TRUE.equals(isSiteToSiteSecure));
     }
 
     /**
@@ -1083,14 +1083,14 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             try {
                 flowFileRepository.close();
             } catch (final Throwable t) {
-                LOG.warn("Unable to shut down FlowFileRepository due to {}", 
new Object[]{t});
+                LOG.warn("Unable to shut down FlowFileRepository due to {}", 
new Object[] {t});
             }
 
             if (this.timerDrivenEngineRef.get().isTerminated() && 
eventDrivenEngineRef.get().isTerminated()) {
                 LOG.info("Controller has been terminated successfully.");
             } else {
                 LOG.warn("Controller hasn't terminated properly.  There exists 
an uninterruptable thread that "
-                        + "will take an indeterminate amount of time to stop.  
Might need to kill the program manually.");
+                    + "will take an indeterminate amount of time to stop.  
Might need to kill the program manually.");
             }
 
             if (externalSiteListener != null) {
@@ -1153,7 +1153,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
      * @throws FlowSynchronizationException if updates to the controller 
failed. If this exception is thrown, then the controller should be considered 
unsafe to be used
      */
     public void synchronize(final FlowSynchronizer synchronizer, final 
DataFlow dataFlow)
-            throws FlowSerializationException, FlowSynchronizationException, 
UninheritableFlowException {
+        throws FlowSerializationException, FlowSynchronizationException, 
UninheritableFlowException {
         writeLock.lock();
         try {
             LOG.debug("Synchronizing controller with proposed flow");
@@ -1199,7 +1199,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
      *
      * @param maxThreadCount
      *
-     * This method must be called while holding the write lock!
+     *            This method must be called while holding the write lock!
      */
     private void setMaxThreadCount(final int maxThreadCount, final FlowEngine 
engine, final AtomicInteger maxThreads) {
         if (maxThreadCount < 1) {
@@ -1267,7 +1267,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
      * @throws ProcessorInstantiationException
      *
      * @throws IllegalStateException if no process group can be found with the 
ID of DTO or with the ID of the DTO's parentGroupId, if the template ID 
specified is invalid, or if the DTO's Parent
-     * Group ID changes but the parent group has incoming or outgoing 
connections
+     *             Group ID changes but the parent group has incoming or 
outgoing connections
      *
      * @throws NullPointerException if the DTO or its ID is null
      */
@@ -1371,7 +1371,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
      *
      * @throws NullPointerException if either argument is null
      * @throws IllegalStateException if the snippet is not valid because a 
component in the snippet has an ID that is not unique to this flow, or because 
it shares an Input Port or Output Port at the
-     * root level whose name already exists in the given ProcessGroup, or 
because the Template contains a Processor or a Prioritizer whose class is not 
valid within this instance of NiFi.
+     *             root level whose name already exists in the given 
ProcessGroup, or because the Template contains a Processor or a Prioritizer 
whose class is not valid within this instance of NiFi.
      * @throws ProcessorInstantiationException if unable to instantiate a 
processor
      */
     public void instantiateSnippet(final ProcessGroup group, final 
FlowSnippetDTO dto) throws ProcessorInstantiationException {
@@ -2542,7 +2542,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         if (firstTimeAdded) {
             final ComponentLog componentLog = new SimpleProcessLogger(id, 
taskNode.getReportingTask());
             final ReportingInitializationContext config = new 
StandardReportingInitializationContext(id, taskNode.getName(),
-                    SchedulingStrategy.TIMER_DRIVEN, "1 min", componentLog, 
this);
+                SchedulingStrategy.TIMER_DRIVEN, "1 min", componentLog, this);
 
             try {
                 task.initialize(config);
@@ -2888,7 +2888,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         readLock.lock();
         try {
             return heartbeatGeneratorFuture != null && 
!heartbeatGeneratorFuture.isCancelled()
-                    && heartbeatSenderFuture != null && 
!heartbeatSenderFuture.isCancelled();
+                && heartbeatSenderFuture != null && 
!heartbeatSenderFuture.isCancelled();
         } finally {
             readLock.unlock();
         }
@@ -2948,7 +2948,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
     /**
      * @return the DN of the Cluster Manager that we are currently connected 
to, if available. This will return null if the instance is not clustered or if 
the instance is clustered but the NCM's DN
-     * is not available - for instance, if cluster communications are not 
secure
+     *         is not available - for instance, if cluster communications are 
not secure
      */
     public String getClusterManagerDN() {
         readLock.lock();
@@ -3101,10 +3101,10 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             @Override
             public boolean isContentSame() {
                 return areEqual(event.getPreviousContentClaimContainer(), 
event.getContentClaimContainer())
-                        && areEqual(event.getPreviousContentClaimSection(), 
event.getContentClaimSection())
-                        && areEqual(event.getPreviousContentClaimIdentifier(), 
event.getContentClaimIdentifier())
-                        && areEqual(event.getPreviousContentClaimOffset(), 
event.getContentClaimOffset())
-                        && areEqual(event.getPreviousFileSize(), 
event.getFileSize());
+                    && areEqual(event.getPreviousContentClaimSection(), 
event.getContentClaimSection())
+                    && areEqual(event.getPreviousContentClaimIdentifier(), 
event.getContentClaimIdentifier())
+                    && areEqual(event.getPreviousContentClaimOffset(), 
event.getContentClaimOffset())
+                    && areEqual(event.getPreviousFileSize(), 
event.getFileSize());
             }
 
             @Override
@@ -3180,7 +3180,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
         // Register a Provenance Event to indicate that we replayed the data.
         final ProvenanceEventRecord sendEvent = new 
StandardProvenanceEventRecord.Builder()
-            .setEventType(ProvenanceEventType.SEND)
+            .setEventType(ProvenanceEventType.DOWNLOAD)
             .setFlowFileUUID(provEvent.getFlowFileUuid())
             .setAttributes(provEvent.getAttributes(), Collections.<String, 
String> emptyMap())
             .setCurrentContentClaim(resourceClaim.getContainer(), 
resourceClaim.getSection(), resourceClaim.getId(), offset, size)
@@ -3297,7 +3297,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
         // Create the ContentClaim
         final ResourceClaim resourceClaim = 
contentClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(),
-                event.getPreviousContentClaimSection(), 
event.getPreviousContentClaimIdentifier(), false);
+            event.getPreviousContentClaimSection(), 
event.getPreviousContentClaimIdentifier(), false);
 
         // Increment Claimant Count, since we will now be referencing the 
Content Claim
         contentClaimManager.incrementClaimantCount(resourceClaim);
@@ -3367,7 +3367,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         // Update the FlowFile Repository to indicate that we have added the 
FlowFile to the flow
         final StandardRepositoryRecord record = new 
StandardRepositoryRecord(queue, flowFileRecord);
         record.setDestination(queue);
-        
flowFileRepository.updateRepository(Collections.<RepositoryRecord>singleton(record));
+        flowFileRepository.updateRepository(Collections.<RepositoryRecord> 
singleton(record));
 
         // Enqueue the data
         queue.put(flowFileRecord);
@@ -3434,11 +3434,9 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 protocolSender.sendBulletins(message);
                 if (LOG.isDebugEnabled()) {
                     LOG.debug(
-                            String.format(
-                                    "Sending bulletins to cluster manager at 
%s",
-                                    dateFormatter.format(new Date())
-                            )
-                    );
+                        String.format(
+                            "Sending bulletins to cluster manager at %s",
+                            dateFormatter.format(new Date())));
                 }
 
             } catch (final UnknownServiceAddressException usae) {
@@ -3496,7 +3494,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                         escapedBulletin = 
BulletinFactory.createBulletin(bulletin.getCategory(), bulletin.getLevel(), 
escapedBulletinMessage);
                     } else {
                         escapedBulletin = 
BulletinFactory.createBulletin(bulletin.getGroupId(), bulletin.getSourceId(), 
bulletin.getSourceType(),
-                                bulletin.getSourceName(), 
bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage);
+                            bulletin.getSourceName(), bulletin.getCategory(), 
bulletin.getLevel(), escapedBulletinMessage);
                     }
                 } else {
                     escapedBulletin = bulletin;
@@ -3554,9 +3552,9 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 final long sendMillis = 
TimeUnit.NANOSECONDS.toMillis(sendNanos);
 
                 heartbeatLogger.info("Heartbeat created at {} and sent at {}; 
send took {} millis",
-                        dateFormatter.format(new 
Date(message.getHeartbeat().getCreatedTimestamp())),
-                        dateFormatter.format(new Date()),
-                        sendMillis);
+                    dateFormatter.format(new 
Date(message.getHeartbeat().getCreatedTimestamp())),
+                    dateFormatter.format(new Date()),
+                    sendMillis);
             } catch (final UnknownServiceAddressException usae) {
                 if (heartbeatLogger.isDebugEnabled()) {
                     heartbeatLogger.debug(usae.getMessage());

Reply via email to