NIFI-1833 - Addressed issues from PR review.

NIFI-1833 Moved AbstractListProcessor.java, EntityListing.java, and 
ListableEntity.java from nifi-standard-processors into nifi-processor-utils
Moved TestAbstractListProcessor.java into nifi-processor-utils
Set nifi-azure-nar's nar dependency back to nifi-standard-services-api-nar
Fixed failing integration tests (ITFetchAzureBlobStorage.java, 
ITListAzureBlobStorage.java, and ITPutAzureStorageBlob.java) and refactored 
them to be able to run in parallel

NIFI-1833 Moved security notice info in the additional details documentation 
into the descriptions of the specific attributes for which those notices are 
intended
Added displayName usage to properties
Updated exception handling in FetchAzureBlobStorage.java and 
PutAzureBlobStorage.java to cause flowfiles with Output/InputStreamCallback 
failures to be routed to the processor's failure relationship
Cleaned up dependencies in pom

NIFI-1833 Removed unnecessary calls to map on Optional in the onTrigger 
exception handling of FetchAzureBlobStorage.java and PutAzureBlobStorage.java

NIFI-1833 Updates due to nifi-processor-utils being moved under nifi-nar-bundles

This closes #1719.

Signed-off-by: Bryan Rosander <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/26d90fbc
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/26d90fbc
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/26d90fbc

Branch: refs/heads/master
Commit: 26d90fbccfa2c4c376e663f6be9c12856decc592
Parents: f30c816
Author: Jeff Storck <[email protected]>
Authored: Sat Apr 29 20:23:35 2017 -0400
Committer: Bryan Rosander <[email protected]>
Committed: Tue May 2 14:39:46 2017 -0400

----------------------------------------------------------------------
 .../nifi-azure-bundle/nifi-azure-nar/pom.xml    |   2 +-
 .../nifi-azure-processors/pom.xml               |  33 +-
 .../azure/AbstractAzureBlobProcessor.java       |   4 +-
 .../azure/AbstractAzureProcessor.java           |   8 +-
 .../nifi/processors/azure/AzureConstants.java   |  18 +-
 .../azure/storage/FetchAzureBlobStorage.java    |  19 +-
 .../azure/storage/ListAzureBlobStorage.java     |  18 +-
 .../azure/storage/PutAzureBlobStorage.java      |  17 +-
 .../azure/storage/utils/BlobInfo.java           |   2 +-
 .../additionalDetails.html                      |  39 --
 .../additionalDetails.html                      |  39 --
 .../additionalDetails.html                      |  39 --
 .../azure/storage/AbstractAzureIT.java          |  97 ----
 .../processors/azure/storage/AzureTestUtil.java |  76 +++
 .../azure/storage/ITFetchAzureBlobStorage.java  |  58 +-
 .../azure/storage/ITListAzureBlobStorage.java   |  47 +-
 .../azure/storage/ITPutAzureStorageBlob.java    |  42 +-
 .../nifi-processor-utils/pom.xml                |   9 +
 .../util/list/AbstractListProcessor.java        | 521 ++++++++++++++++++
 .../nifi/processor/util/list/EntityListing.java |  71 +++
 .../processor/util/list/ListableEntity.java     |  40 ++
 .../util/list/TestAbstractListProcessor.java    | 528 +++++++++++++++++++
 .../standard/AbstractListProcessor.java         | 523 ------------------
 .../nifi/processors/standard/ListFile.java      |   1 +
 .../processors/standard/ListFileTransfer.java   |   1 +
 .../processors/standard/util/EntityListing.java |  71 ---
 .../nifi/processors/standard/util/FileInfo.java |   2 +
 .../standard/util/ListableEntity.java           |  40 --
 .../standard/TestAbstractListProcessor.java     | 527 ------------------
 .../nifi/processors/standard/TestListFile.java  |   1 +
 pom.xml                                         |   5 +
 31 files changed, 1404 insertions(+), 1494 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
index e6c3c9b..f75bb7f 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
@@ -38,7 +38,7 @@
         
         <dependency>
             <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-standard-nar</artifactId>
+            <artifactId>nifi-standard-services-api-nar</artifactId>
             <type>nar</type>
         </dependency>  
     </dependencies>

http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
index 9b4f28b..6133e30 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -19,9 +19,6 @@
     </parent>
     <artifactId>nifi-azure-processors</artifactId>
     <packaging>jar</packaging>
-    <properties>
-        <powermock.version>1.6.5</powermock.version>
-    </properties>
     <dependencies>
         <dependency>
             <groupId>org.apache.nifi</groupId>
@@ -32,24 +29,10 @@
             <artifactId>nifi-utils</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-ssl-context-service-api</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.avro</groupId>
-            <artifactId>avro</artifactId>
-        </dependency>
-        <dependency>
             <groupId>com.microsoft.azure</groupId>
             <artifactId>azure-eventhubs</artifactId>
             <version>0.9.0</version>
         </dependency>
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-core</artifactId>
-            <version>2.8.6</version>
-        </dependency>
         <!--<dependency>
             <groupId>com.microsoft.eventhubs.client</groupId>
             <artifactId>eventhubs-client</artifactId>
@@ -66,26 +49,18 @@
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-simple</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>org.powermock</groupId>
-            <artifactId>powermock-module-junit4</artifactId>
-            <version>${powermock.version}</version>
-            <scope>test</scope>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-processor-utils</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-standard-processors</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
+            <artifactId>nifi-distributed-cache-client-service-api</artifactId>
+            <scope>test</scope>
         </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
index 2026711..f0729e6 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
@@ -25,8 +25,8 @@ import java.util.List;
 
 public abstract class AbstractAzureBlobProcessor extends 
AbstractAzureProcessor {
 
-    public static final PropertyDescriptor BLOB = new 
PropertyDescriptor.Builder().name("Blob").description("The filename of the 
blob").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            
.expressionLanguageSupported(true).required(true).defaultValue("${azure.blobname}").build();
+    public static final PropertyDescriptor BLOB = new 
PropertyDescriptor.Builder().name("blob").displayName("Blob").description("The 
filename of the blob")
+            
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).defaultValue("${azure.blobname}").build();
 
     private static final List<PropertyDescriptor> PROPERTIES = Collections
             .unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, 
AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, BLOB));

http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java
index c95ee99..5812236 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java
@@ -34,7 +34,7 @@ public abstract class AbstractAzureProcessor extends 
AbstractProcessor {
 
     public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success").description("All successfully processed 
FlowFiles are routed to this relationship").build();
     public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure").description("Unsuccessful operations 
will be transferred to the failure relationship.").build();
-    public static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, 
REL_FAILURE)));
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, 
REL_FAILURE)));
 
     protected CloudStorageAccount createStorageConnection(ProcessContext 
context) {
         final String accountName = 
context.getProperty(AzureConstants.ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
@@ -67,11 +67,7 @@ public abstract class AbstractAzureProcessor extends 
AbstractProcessor {
      */
     private static CloudStorageAccount 
createStorageAccountFromConnectionString(String storageConnectionString) throws 
IllegalArgumentException, URISyntaxException, InvalidKeyException {
         CloudStorageAccount storageAccount;
-        try {
-            storageAccount = 
CloudStorageAccount.parse(storageConnectionString);
-        } catch (IllegalArgumentException | URISyntaxException | 
InvalidKeyException e) {
-            throw e;
-        }
+        storageAccount = CloudStorageAccount.parse(storageConnectionString);
         return storageAccount;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java
index 9a51030..1e0cde3 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java
@@ -23,14 +23,24 @@ public final class AzureConstants {
     public static final String BLOCK = "Block";
     public static final String PAGE = "Page";
 
-    public static final PropertyDescriptor ACCOUNT_KEY = new 
PropertyDescriptor.Builder().name("Storage Account Key").description("The 
storage account key")
+    public static final PropertyDescriptor ACCOUNT_KEY = new 
PropertyDescriptor.Builder().name("storage-account-key").displayName("Storage 
Account Key")
+            .description("The storage account key.  There are certain risks in 
allowing the account key to be stored as a flowfile" +
+                    "attribute. While it does provide for a more flexible flow 
by allowing the account key to " +
+                    "be fetched dynamically from a flow file attribute, care 
must be taken to restrict access to " +
+                    "the event provenance data (e.g. by strictly controlling 
the policies governing provenance for this Processor). " +
+                    "In addition, the provenance repositories may be put on 
encrypted disk partitions.")
             
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).sensitive(true).build();
 
-    public static final PropertyDescriptor ACCOUNT_NAME = new 
PropertyDescriptor.Builder().name("Storage Account Name").description("The 
storage account name")
+    public static final PropertyDescriptor ACCOUNT_NAME = new 
PropertyDescriptor.Builder().name("storage-account-name").displayName("Storage 
Account Name")
+            .description("The storage account name.  There are certain risks 
in allowing the account name to be stored as a flowfile" +
+                    "attribute. While it does provide for a more flexible flow 
by allowing the account name to " +
+                    "be fetched dynamically from a flow file attribute, care 
must be taken to restrict access to " +
+                    "the event provenance data (e.g. by strictly controlling 
the policies governing provenance for this Processor). " +
+                    "In addition, the provenance repositories may be put on 
encrypted disk partitions.")
             
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).sensitive(true).build();
 
-    public static final PropertyDescriptor CONTAINER = new 
PropertyDescriptor.Builder().name("Container name").description("Name of the 
azure storage container")
-            
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).build();
+    public static final PropertyDescriptor CONTAINER = new 
PropertyDescriptor.Builder().name("container-name").displayName("Container 
name")
+            .description("Name of the azure storage 
container").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).build();
 
     // use HTTPS by default as per MSFT recommendation
     public static final String FORMAT_DEFAULT_CONNECTION_STRING = 
"DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";

http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
index 163a962..cd08ec1 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
@@ -23,11 +23,13 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
@@ -75,6 +77,7 @@ public class FetchAzureBlobStorage extends 
AbstractAzureBlobProcessor {
         String containerName = 
context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
         String blobPath = 
context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue();
 
+        AtomicReference<Exception> storedException = new AtomicReference<>();
         try {
             CloudStorageAccount storageAccount = 
createStorageConnection(context, flowFile);
             CloudBlobClient blobClient = 
storageAccount.createCloudBlobClient();
@@ -89,6 +92,7 @@ public class FetchAzureBlobStorage extends 
AbstractAzureBlobProcessor {
                 try {
                     blob.download(os);
                 } catch (StorageException e) {
+                    storedException.set(e);
                     throw new IOException(e);
                 }
             });
@@ -103,10 +107,15 @@ public class FetchAzureBlobStorage extends 
AbstractAzureBlobProcessor {
             session.transfer(flowFile, REL_SUCCESS);
             final long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
             session.getProvenanceReporter().fetch(flowFile, 
blob.getSnapshotQualifiedUri().toString(), transferMillis);
-
-        } catch (IllegalArgumentException | URISyntaxException | 
StorageException e1) {
-            flowFile = session.penalize(flowFile);
-            session.transfer(flowFile, REL_FAILURE);
+        } catch (IllegalArgumentException | URISyntaxException | 
StorageException | ProcessException e) {
+            if (e instanceof ProcessException && storedException.get() == 
null) {
+                throw (ProcessException) e;
+            } else {
+                Exception failureException = 
Optional.ofNullable(storedException.get()).orElse(e);
+                getLogger().error("Failure to fetch Azure blob {}",  new 
Object[]{blobPath}, failureException);
+                flowFile = session.penalize(flowFile);
+                session.transfer(flowFile, REL_FAILURE);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
index f8a6c4d..148f724 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
@@ -40,10 +40,10 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.list.AbstractListProcessor;
 import org.apache.nifi.processors.azure.AzureConstants;
 import org.apache.nifi.processors.azure.storage.utils.BlobInfo;
 import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder;
-import org.apache.nifi.processors.standard.AbstractListProcessor;
 
 import com.microsoft.azure.storage.CloudStorageAccount;
 import com.microsoft.azure.storage.StorageException;
@@ -59,7 +59,9 @@ import com.microsoft.azure.storage.blob.ListBlobItem;
 @TriggerSerially
 @Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
 @SeeAlso({ FetchAzureBlobStorage.class, PutAzureBlobStorage.class })
-@CapabilityDescription("Lists blobs in an Azure Storage container. Listing 
details are attached to an empty FlowFile for use with FetchAzureBlobStorage")
+@CapabilityDescription("Lists blobs in an Azure Storage container. Listing 
details are attached to an empty FlowFile for use with FetchAzureBlobStorage.  
" +
+        "This Processor is designed to run on Primary Node only in a cluster. 
If the primary node changes, the new Primary Node will pick up where the " +
+        "previous node left off without duplicating all of the data.")
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
 @WritesAttributes({ @WritesAttribute(attribute = "azure.container", 
description = "The name of the Azure container"),
         @WritesAttribute(attribute = "azure.blobname", description = "The name 
of the Azure blob"),
@@ -71,12 +73,14 @@ import com.microsoft.azure.storage.blob.ListBlobItem;
         @WritesAttribute(attribute = "mime.type", description = "MimeType of 
the content"),
         @WritesAttribute(attribute = "lang", description = "Language code for 
the content"),
         @WritesAttribute(attribute = "azure.blobtype", description = "This is 
the type of blob and can be either page or block type") })
-@Stateful(scopes = { Scope.LOCAL, Scope.CLUSTER }, description = "After 
performing a listing of blobs, the timestamp of the newest blob is stored. "
-        + "This allows the Processor to list only blobs that have been added 
or modified after " + "this date the next time that the Processor is run.")
+@Stateful(scopes = { Scope.CLUSTER }, description = "After performing a 
listing of blobs, the timestamp of the newest blob is stored. " +
+        "This allows the Processor to list only blobs that have been added or 
modified after this date the next time that the Processor is run.  State is " +
+        "stored across the cluster so that this Processor can be run on 
Primary Node only and if a new Primary Node is selected, the new node can pick 
up " +
+        "where the previous node left off, without duplicating the data.")
 public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
 
-    private static final PropertyDescriptor PREFIX = new 
PropertyDescriptor.Builder().name("Prefix").description("Search prefix for 
listing").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(true).required(false).build();
+    private static final PropertyDescriptor PREFIX = new 
PropertyDescriptor.Builder().name("prefix").displayName("Prefix").description("Search
 prefix for listing")
+            
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(false).build();
 
     private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, 
AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, PREFIX));
 
@@ -155,7 +159,7 @@ public class ListAzureBlobStorage extends 
AbstractListProcessor<BlobInfo> {
         return listing;
     }
 
-    protected CloudStorageAccount createStorageConnection(ProcessContext 
context) {
+    private CloudStorageAccount createStorageConnection(ProcessContext 
context) {
         final String accountName = 
context.getProperty(AzureConstants.ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
         final String accountKey = 
context.getProperty(AzureConstants.ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
         final String storageConnectionString = 
String.format(AzureConstants.FORMAT_DEFAULT_CONNECTION_STRING, accountName, 
accountKey);

http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
index e03bc25..e164635 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
@@ -22,7 +22,9 @@ import java.io.InputStream;
 import java.net.URISyntaxException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -69,6 +71,7 @@ public class PutAzureBlobStorage extends 
AbstractAzureBlobProcessor {
 
         String blobPath = 
context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue();
 
+        AtomicReference<Exception> storedException = new AtomicReference<>();
         try {
             CloudStorageAccount storageAccount = 
createStorageConnection(context, flowFile);
             CloudBlobClient blobClient = 
storageAccount.createCloudBlobClient();
@@ -94,6 +97,7 @@ public class PutAzureBlobStorage extends 
AbstractAzureBlobProcessor {
                     attributes.put("azure.length", String.valueOf(length));
                     attributes.put("azure.timestamp", 
String.valueOf(properties.getLastModified()));
                 } catch (StorageException | URISyntaxException e) {
+                    storedException.set(e);
                     throw new IOException(e);
                 }
             });
@@ -106,10 +110,15 @@ public class PutAzureBlobStorage extends 
AbstractAzureBlobProcessor {
             final long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
             session.getProvenanceReporter().send(flowFile, 
blob.getSnapshotQualifiedUri().toString(), transferMillis);
 
-        } catch (IllegalArgumentException | URISyntaxException | 
StorageException e) {
-            getLogger().error("Failed to put Azure blob {}", new 
Object[]{blobPath}, e);
-            flowFile = session.penalize(flowFile);
-            session.transfer(flowFile, REL_FAILURE);
+        } catch (IllegalArgumentException | URISyntaxException | 
StorageException | ProcessException e) {
+            if (e instanceof ProcessException && storedException.get() == 
null) {
+                throw (ProcessException) e;
+            } else {
+                Exception failureException = 
Optional.ofNullable(storedException.get()).orElse(e);
+                getLogger().error("Failed to put Azure blob {}", new 
Object[]{blobPath}, failureException);
+                flowFile = session.penalize(flowFile);
+                session.transfer(flowFile, REL_FAILURE);
+            }
         }
 
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java
index 6907d94..28a47ea 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java
@@ -18,7 +18,7 @@ package org.apache.nifi.processors.azure.storage.utils;
 
 import java.io.Serializable;
 
-import org.apache.nifi.processors.standard.util.ListableEntity;
+import org.apache.nifi.processor.util.list.ListableEntity;
 
 public class BlobInfo implements Comparable<BlobInfo>, Serializable, 
ListableEntity {
     private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage/additionalDetails.html
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage/additionalDetails.html
deleted file mode 100644
index b4b8e3b..0000000
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage/additionalDetails.html
+++ /dev/null
@@ -1,39 +0,0 @@
-<!DOCTYPE html>
-<html lang="en">
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-      http://www.apache.org/licenses/LICENSE-2.0
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<head>
-    <meta charset="utf-8" />
-    <title>FetchAzureBlobStorage Processor</title>
-    <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" 
type="text/css" />
-</head>
-
-<body>
-
-<h2>Apache NiFi Azure Processors</h2>
-
-<h3>Important Security Note</h3>
-<p>
-    There are certain risks in allowing the account name and key to be stored 
as flowfile
-    attributes. While it does provide for a more flexible flow by allowing the 
account name and key
-    be fetched dynamically from the flow file attributes, care must be taken 
to restrict access to
-    the recorded event provenance data (e.g. by strictly controlling the 
provenance policy permission).
-    In addition, the provenance repositories may be put on encrypted disk 
partitions.
-</p>
-<p>
-    <a href="#" onclick="history.back()">Return to a previous page</a>
-</p>
-</body>
-</html>

http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.ListAzureBlobStorage/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.ListAzureBlobStorage/additionalDetails.html
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.ListAzureBlobStorage/additionalDetails.html
deleted file mode 100644
index 76e8775..0000000
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.ListAzureBlobStorage/additionalDetails.html
+++ /dev/null
@@ -1,39 +0,0 @@
-<!DOCTYPE html>
-<html lang="en">
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-      http://www.apache.org/licenses/LICENSE-2.0
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<head>
-    <meta charset="utf-8" />
-    <title>ListAzureBlobStorage Processor</title>
-    <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" 
type="text/css" />
-</head>
-
-<body>
-
-<h2>Apache NiFi Azure Processors</h2>
-
-<h3>Important Security Note</h3>
-<p>
-    There are certain risks in allowing the account name and key to be stored 
as flowfile
-    attributes. While it does provide for a more flexible flow by allowing the 
account name and key
-    be fetched dynamically from the flow file attributes, care must be taken 
to restrict access to
-    the recorded event provenance data (e.g. by strictly controlling the 
provenance policy permission).
-    In addition, the provenance repositories may be put on encrypted disk 
partitions.
-</p>
-<p>
-    <a href="#" onclick="history.back()">Return to a previous page</a>
-</p>
-</body>
-</html>

http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureBlobStorage/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureBlobStorage/additionalDetails.html
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureBlobStorage/additionalDetails.html
deleted file mode 100644
index 0a7ff35..0000000
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureBlobStorage/additionalDetails.html
+++ /dev/null
@@ -1,39 +0,0 @@
-<!DOCTYPE html>
-<html lang="en">
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-      http://www.apache.org/licenses/LICENSE-2.0
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<head>
-    <meta charset="utf-8" />
-    <title>PutAzureBlobStorage Processor</title>
-    <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" 
type="text/css" />
-</head>
-
-<body>
-
-<h2>Apache NiFi Azure Processors</h2>
-
-<h3>Important Security Note</h3>
-<p>
-    There are certain risks in allowing the account name and key to be stored 
as flowfile
-    attributes. While it does provide for a more flexible flow by allowing the 
account name and key
-    be fetched dynamically from the flow file attributes, care must be taken 
to restrict access to
-    the recorded event provenance data (e.g. by strictly controlling the 
provenance policy permission).
-    In addition, the provenance repositories may be put on encrypted disk 
partitions.
-</p>
-<p>
-    <a href="#" onclick="history.back()">Return to a previous page</a>
-</p>
-</body>
-</html>

http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java
deleted file mode 100644
index 91a8c73..0000000
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.azure.storage;
-
-import com.microsoft.azure.storage.CloudStorageAccount;
-import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.blob.CloudBlob;
-import com.microsoft.azure.storage.blob.CloudBlobClient;
-import com.microsoft.azure.storage.blob.CloudBlobContainer;
-import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
-import com.microsoft.azure.storage.blob.ListBlobItem;
-import org.apache.nifi.processors.azure.AzureConstants;
-import org.apache.nifi.util.file.FileUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.security.InvalidKeyException;
-import java.util.Properties;
-
-import static org.junit.Assert.fail;
-
-public abstract class AbstractAzureIT {
-    protected static final String CREDENTIALS_FILE = 
System.getProperty("user.home") + "/azure-credentials.PROPERTIES";
-    public static final String TEST_CONTAINER_NAME = "nifitest";
-
-    private static final Properties CONFIG;
-    protected static final String TEST_BLOB_NAME = "testing";
-    protected static final String TEST_TABLE_NAME = "testing";
-
-    static {
-        final FileInputStream fis;
-        CONFIG = new Properties();
-        try {
-            fis = new FileInputStream(CREDENTIALS_FILE);
-            try {
-                CONFIG.load(fis);
-            } catch (IOException e) {
-                fail("Could not open credentials file " + CREDENTIALS_FILE + 
": " + e.getLocalizedMessage());
-            } finally {
-                FileUtils.closeQuietly(fis);
-            }
-        } catch (FileNotFoundException e) {
-            fail("Could not open credentials file " + CREDENTIALS_FILE + ": " 
+ e.getLocalizedMessage());
-        }
-
-    }
-
-    @BeforeClass
-    public static void oneTimeSetup() throws StorageException, 
InvalidKeyException, URISyntaxException {
-        CloudBlobContainer container = getContainer();
-        container.createIfNotExists();
-    }
-
-    @AfterClass
-    public static void tearDown() throws InvalidKeyException, 
URISyntaxException, StorageException {
-        CloudBlobContainer container = getContainer();
-        for (ListBlobItem blob : container.listBlobs()) {
-            if (blob instanceof CloudBlob) {
-                ((CloudBlob) 
blob).delete(DeleteSnapshotsOption.INCLUDE_SNAPSHOTS, null, null, null);
-            }
-        }
-    }
-
-    public static String getAccountName() {
-        return CONFIG.getProperty("accountName");
-    }
-
-    public static String getAccountKey() {
-        return CONFIG.getProperty("accountKey");
-    }
-
-    protected static CloudBlobContainer getContainer() throws 
InvalidKeyException, URISyntaxException, StorageException {
-        String storageConnectionString = 
String.format(AzureConstants.FORMAT_DEFAULT_CONNECTION_STRING, 
getAccountName(), getAccountKey());
-        CloudStorageAccount storageAccount = 
CloudStorageAccount.parse(storageConnectionString);
-        CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
-        return blobClient.getContainerReference(TEST_CONTAINER_NAME);
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java
new file mode 100644
index 0000000..636845c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AzureTestUtil.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import static org.junit.Assert.fail;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.Properties;
+
+import org.apache.nifi.processors.azure.AzureConstants;
+import org.apache.nifi.util.file.FileUtils;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+
+class AzureTestUtil {
+    private static final String CREDENTIALS_FILE = 
System.getProperty("user.home") + "/azure-credentials.PROPERTIES";
+    static final String TEST_CONTAINER_NAME_PREFIX = "nifitest";
+
+    private static final Properties CONFIG;
+    static final String TEST_BLOB_NAME = "testing";
+
+    static {
+        final FileInputStream fis;
+        CONFIG = new Properties();
+        try {
+            fis = new FileInputStream(CREDENTIALS_FILE);
+            try {
+                CONFIG.load(fis);
+            } catch (IOException e) {
+                fail("Could not open credentials file " + CREDENTIALS_FILE + 
": " + e.getLocalizedMessage());
+            } finally {
+                FileUtils.closeQuietly(fis);
+            }
+        } catch (FileNotFoundException e) {
+            fail("Could not open credentials file " + CREDENTIALS_FILE + ": " 
+ e.getLocalizedMessage());
+        }
+
+    }
+
+    static String getAccountName() {
+        return CONFIG.getProperty("accountName");
+    }
+
+    static String getAccountKey() {
+        return CONFIG.getProperty("accountKey");
+    }
+
+    static CloudBlobContainer getContainer(String containerName) throws 
InvalidKeyException, URISyntaxException, StorageException {
+        String storageConnectionString = 
String.format(AzureConstants.FORMAT_DEFAULT_CONNECTION_STRING, 
getAccountName(), getAccountKey());
+        CloudStorageAccount storageAccount = 
CloudStorageAccount.parse(storageConnectionString);
+        CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
+        return blobClient.getContainerReference(containerName);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java
index 7dc8830..eeedd3b 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java
@@ -16,12 +16,15 @@
  */
 package org.apache.nifi.processors.azure.storage;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.URISyntaxException;
 import java.security.InvalidKeyException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import org.apache.nifi.processors.azure.AbstractAzureProcessor;
 import org.apache.nifi.processors.azure.AzureConstants;
@@ -31,32 +34,47 @@ import org.apache.nifi.util.TestRunners;
 import org.junit.Test;
 
 import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlob;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
 
-public class ITFetchAzureBlobStorage extends AbstractAzureIT {
+public class ITFetchAzureBlobStorage {
 
     @Test
     public void testFetchingBlob() throws InvalidKeyException, 
URISyntaxException, StorageException, IOException {
+        String containerName = String.format("%s-%s", 
AzureTestUtil.TEST_CONTAINER_NAME_PREFIX, UUID.randomUUID());
+        CloudBlobContainer container = 
AzureTestUtil.getContainer(containerName);
+        container.createIfNotExists();
+
+        CloudBlob blob = 
container.getBlockBlobReference(AzureTestUtil.TEST_BLOB_NAME);
+        byte[] buf = "0123456789".getBytes();
+        InputStream in = new ByteArrayInputStream(buf);
+        blob.upload(in, 10);
+
         final TestRunner runner = TestRunners.newTestRunner(new 
FetchAzureBlobStorage());
 
-        runner.setValidateExpressionUsage(true);
-
-        runner.setProperty(AzureConstants.ACCOUNT_NAME, getAccountName());
-        runner.setProperty(AzureConstants.ACCOUNT_KEY, getAccountKey());
-        runner.setProperty(AzureConstants.CONTAINER, TEST_CONTAINER_NAME);
-        runner.setProperty(FetchAzureBlobStorage.BLOB, "${azure.blobname}");
-
-        final Map<String, String> attributes = new HashMap<>();
-        attributes.put("azure.primaryUri", "http://"; + getAccountName() + 
".blob.core.windows.net/" + TEST_CONTAINER_NAME + "/" + TEST_BLOB_NAME);
-        attributes.put("azure.blobname", TEST_BLOB_NAME);
-        attributes.put("azure.blobtype", AzureConstants.BLOCK);
-        runner.enqueue(new byte[0], attributes);
-        runner.run();
-
-        
runner.assertAllFlowFilesTransferred(AbstractAzureProcessor.REL_SUCCESS, 1);
-        List<MockFlowFile> flowFilesForRelationship = 
runner.getFlowFilesForRelationship(FetchAzureBlobStorage.REL_SUCCESS);
-        for (MockFlowFile flowFile : flowFilesForRelationship) {
-            flowFile.assertContentEquals("0123456789".getBytes());
-            flowFile.assertAttributeEquals("azure.length", "10");
+        try {
+            runner.setValidateExpressionUsage(true);
+
+            runner.setProperty(AzureConstants.ACCOUNT_NAME, 
AzureTestUtil.getAccountName());
+            runner.setProperty(AzureConstants.ACCOUNT_KEY, 
AzureTestUtil.getAccountKey());
+            runner.setProperty(AzureConstants.CONTAINER, containerName);
+            runner.setProperty(FetchAzureBlobStorage.BLOB, 
"${azure.blobname}");
+
+            final Map<String, String> attributes = new HashMap<>();
+            attributes.put("azure.primaryUri", "https://"; + 
AzureTestUtil.getAccountName() + ".blob.core.windows.net/" + containerName + 
"/" + AzureTestUtil.TEST_BLOB_NAME);
+            attributes.put("azure.blobname", AzureTestUtil.TEST_BLOB_NAME);
+            attributes.put("azure.blobtype", AzureConstants.BLOCK);
+            runner.enqueue(new byte[0], attributes);
+            runner.run();
+
+            
runner.assertAllFlowFilesTransferred(AbstractAzureProcessor.REL_SUCCESS, 1);
+            List<MockFlowFile> flowFilesForRelationship = 
runner.getFlowFilesForRelationship(FetchAzureBlobStorage.REL_SUCCESS);
+            for (MockFlowFile flowFile : flowFilesForRelationship) {
+                flowFile.assertContentEquals("0123456789".getBytes());
+                flowFile.assertAttributeEquals("azure.length", "10");
+            }
+        } finally {
+            container.deleteIfExists();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java
index 277538c..6dd9088 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java
@@ -21,55 +21,50 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.URISyntaxException;
 import java.security.InvalidKeyException;
+import java.util.UUID;
 
 import org.apache.nifi.processors.azure.AzureConstants;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import com.microsoft.azure.storage.StorageException;
 import com.microsoft.azure.storage.blob.CloudBlob;
 import com.microsoft.azure.storage.blob.CloudBlobContainer;
 
-public class ITListAzureBlobStorage extends AbstractAzureIT {
+public class ITListAzureBlobStorage {
 
-    @BeforeClass
-    public static void setupSomeFiles() throws InvalidKeyException, 
URISyntaxException, StorageException, IOException {
-        CloudBlobContainer container = getContainer();
+    @Test
+    public void testListsAzureBlobStorageContent() throws InvalidKeyException, 
StorageException, URISyntaxException, IOException {
+        String containerName = String.format("%s-%s", 
AzureTestUtil.TEST_CONTAINER_NAME_PREFIX, UUID.randomUUID());
+        CloudBlobContainer container = 
AzureTestUtil.getContainer(containerName);
         container.createIfNotExists();
 
-        CloudBlob blob = container.getBlockBlobReference(TEST_BLOB_NAME);
+        CloudBlob blob = 
container.getBlockBlobReference(AzureTestUtil.TEST_BLOB_NAME);
         byte[] buf = "0123456789".getBytes();
         InputStream in = new ByteArrayInputStream(buf);
         blob.upload(in, 10);
-    }
 
-    @AfterClass
-    public static void tearDown() throws InvalidKeyException, 
URISyntaxException, StorageException {
-        CloudBlobContainer container = getContainer();
-        container.deleteIfExists();
-    }
-
-    @Test
-    public void testListsAzureBlobStorageContent() {
         final TestRunner runner = TestRunners.newTestRunner(new 
ListAzureBlobStorage());
 
-        runner.setProperty(AzureConstants.ACCOUNT_NAME, getAccountName());
-        runner.setProperty(AzureConstants.ACCOUNT_KEY, getAccountKey());
-        runner.setProperty(AzureConstants.CONTAINER, TEST_CONTAINER_NAME);
+        try {
+            runner.setProperty(AzureConstants.ACCOUNT_NAME, 
AzureTestUtil.getAccountName());
+            runner.setProperty(AzureConstants.ACCOUNT_KEY, 
AzureTestUtil.getAccountKey());
+            runner.setProperty(AzureConstants.CONTAINER, containerName);
 
-        // requires multiple runs to deal with List processor checking
-        runner.run(3);
+            // requires multiple runs to deal with List processor checking
+            runner.run(3);
 
-        runner.assertTransferCount(ListAzureBlobStorage.REL_SUCCESS, 1);
-        runner.assertAllFlowFilesTransferred(ListAzureBlobStorage.REL_SUCCESS, 
1);
+            runner.assertTransferCount(ListAzureBlobStorage.REL_SUCCESS, 1);
+            
runner.assertAllFlowFilesTransferred(ListAzureBlobStorage.REL_SUCCESS, 1);
 
-        for (MockFlowFile entry : 
runner.getFlowFilesForRelationship(ListAzureBlobStorage.REL_SUCCESS)) {
-            entry.assertAttributeEquals("azure.length", "10");
-            entry.assertAttributeEquals("mime.type", 
"application/octet-stream");
+            for (MockFlowFile entry : 
runner.getFlowFilesForRelationship(ListAzureBlobStorage.REL_SUCCESS)) {
+                entry.assertAttributeEquals("azure.length", "10");
+                entry.assertAttributeEquals("mime.type", 
"application/octet-stream");
+            }
+        } finally {
+            container.deleteIfExists();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java
index 0308add..5cecdbc 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java
@@ -17,7 +17,10 @@
 package org.apache.nifi.processors.azure.storage;
 
 import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
 import java.util.List;
+import java.util.UUID;
 
 import org.apache.nifi.processors.azure.AzureConstants;
 import org.apache.nifi.util.MockFlowFile;
@@ -25,27 +28,38 @@ import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Test;
 
-public class ITPutAzureStorageBlob extends AbstractAzureIT {
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+
+public class ITPutAzureStorageBlob {
 
     @Test
-    public void testPuttingBlob() throws IOException {
+    public void testPuttingBlob() throws IOException, InvalidKeyException, 
StorageException, URISyntaxException {
+        String containerName = String.format("%s-%s", 
AzureTestUtil.TEST_CONTAINER_NAME_PREFIX, UUID.randomUUID());
+        CloudBlobContainer container = 
AzureTestUtil.getContainer(containerName);
+        container.createIfNotExists();
+
         final TestRunner runner = TestRunners.newTestRunner(new 
PutAzureBlobStorage());
 
-        runner.setValidateExpressionUsage(true);
+        try {
+            runner.setValidateExpressionUsage(true);
 
-        runner.setProperty(AzureConstants.ACCOUNT_NAME, getAccountName());
-        runner.setProperty(AzureConstants.ACCOUNT_KEY, getAccountKey());
-        runner.setProperty(AzureConstants.CONTAINER, TEST_CONTAINER_NAME);
-        runner.setProperty(FetchAzureBlobStorage.BLOB, "testingUpload");
+            runner.setProperty(AzureConstants.ACCOUNT_NAME, 
AzureTestUtil.getAccountName());
+            runner.setProperty(AzureConstants.ACCOUNT_KEY, 
AzureTestUtil.getAccountKey());
+            runner.setProperty(AzureConstants.CONTAINER, containerName);
+            runner.setProperty(FetchAzureBlobStorage.BLOB, "testingUpload");
 
-        runner.enqueue("0123456789".getBytes());
-        runner.run();
+            runner.enqueue("0123456789".getBytes());
+            runner.run();
 
-        runner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 
1);
-        List<MockFlowFile> flowFilesForRelationship = 
runner.getFlowFilesForRelationship(PutAzureBlobStorage.REL_SUCCESS);
-        for (MockFlowFile flowFile : flowFilesForRelationship) {
-            flowFile.assertContentEquals("0123456789".getBytes());
-            flowFile.assertAttributeEquals("azure.length", "10");
+            
runner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 1);
+            List<MockFlowFile> flowFilesForRelationship = 
runner.getFlowFilesForRelationship(PutAzureBlobStorage.REL_SUCCESS);
+            for (MockFlowFile flowFile : flowFilesForRelationship) {
+                flowFile.assertContentEquals("0123456789".getBytes());
+                flowFile.assertAttributeEquals("azure.length", "10");
+            }
+        } finally {
+            container.deleteIfExists();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
index f7bcb43..711ddae 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
@@ -55,6 +55,15 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-distributed-cache-client-service-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-mapper-asl</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-mock</artifactId>
             <scope>test</scope>
         </dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
new file mode 100644
index 0000000..2666e2c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processor.util.list;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import 
org.apache.nifi.distributed.cache.client.exception.DeserializationException;
+import 
org.apache.nifi.distributed.cache.client.exception.SerializationException;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * <p>
+ * An Abstract Processor that is intended to simplify the coding required in 
order to perform Listing operations of remote resources.
+ * Those remote resources may be files, "objects", "messages", or any other 
sort of entity that may need to be listed in such a way that
+ * we identity the entity only once. Each of these objects, messages, etc. is 
referred to as an "entity" for the scope of this Processor.
+ * </p>
+ * <p>
+ * This class is responsible for triggering the listing to occur, filtering 
the results returned such that only new (unlisted) entities
+ * or entities that have been modified will be emitted from the Processor.
+ * </p>
+ * <p>
+ * In order to make use of this abstract class, the entities listed must meet 
the following criteria:
+ * </p>
+ * <ul>
+ * <li>
+ * Entity must have a timestamp associated with it. This timestamp is used to 
determine if entities are "new" or not. Any entity that is
+ * returned by the listing will be considered "new" if the timestamp is later 
than the latest timestamp pulled.
+ * </li>
+ * <li>
+ * If the timestamp of an entity is before OR equal to the latest timestamp 
pulled, then the entity is not considered new. If the timestamp is later
+ * than the last timestamp pulled, then the entity is considered new.
+ * </li>
+ * <li>
+ * Entity must have a user-readable name that can be used for logging purposes.
+ * </li>
+ * </ul>
+ * <p>
+ * This class persists state across restarts so that even if NiFi is 
restarted, duplicates will not be pulled from the target system given the above 
criteria. This is
+ * performed using the {@link StateManager}. This allows the system to be 
restarted and begin processing where it left off. The state that is stored is 
the latest timestamp
+ * that has been pulled (as determined by the timestamps of the entities that 
are returned). See the section above for information about how this information 
isused in order to
+ * determine new entities.
+ * </p>
+ * <p>
+ * NOTE: This processor performs migrations of legacy state mechanisms 
inclusive of locally stored, file-based state and the optional utilization of 
the <code>Distributed Cache
+ * Service</code> property to the new {@link StateManager} functionality. Upon 
successful migration, the associated data from one or both of the legacy 
mechanisms is purged.
+ * </p>
+ * <p>
+ * For each new entity that is listed, the Processor will send a FlowFile to 
the 'success' relationship. The FlowFile will have no content but will have 
some set
+ * of attributes (defined by the concrete implementation) that can be used to 
fetch those remote resources or interact with them in whatever way makes sense 
for
+ * the configured dataflow.
+ * </p>
+ * <p>
+ * Subclasses are responsible for the following:
+ * </p>
+ * <ul>
+ * <li>
+ * Perform a listing of remote resources. The subclass will implement the 
{@link #performListing(ProcessContext, Long)} method, which creates a listing 
of all
+ * entities on the remote system that have timestamps later than the provided 
timestamp. If the entities returned have a timestamp before the provided one, 
those
+ * entities will be filtered out. It is therefore not necessary to perform the 
filtering of timestamps but is provided in order to give the implementation the 
ability
+ * to filter those resources on the server side rather than pulling back all 
of the information, if it makes sense to do so in the concrete implementation.
+ * </li>
+ * <li>
+ * Creating a Map of attributes that are applicable for an entity. The 
attributes that are assigned to each FlowFile are exactly those returned by the
+ * {@link #createAttributes(ListableEntity, ProcessContext)}.
+ * </li>
+ * <li>
+ * Returning the configured path. Many resources can be comprised of a "path" 
(or a "container" or "bucket", etc.) as well as name or identifier that is 
unique only
+ * within that path. The {@link #getPath(ProcessContext)} method is 
responsible for returning the path that is currently being polled for entities. 
If this does concept
+ * does not apply for the concrete implementation, it is recommended that the 
concrete implementation return "." or "/" for all invocations of this method.
+ * </li>
+ * <li>
+ * Determining when the listing must be cleared. It is sometimes necessary to 
clear state about which entities have already been ingested, as the result of a 
user
+ * changing a property value. The {@link 
#isListingResetNecessary(PropertyDescriptor)} method is responsible for 
determining when the listing needs to be reset by returning
+ * a boolean indicating whether or not a change in the value of the provided 
property should trigger the timestamp and identifier information to be cleared.
+ * </li>
+ * </ul>
+ */
+@TriggerSerially
+@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "After a 
listing of resources is performed, the latest timestamp of any of the resources 
is stored in the component's state. "
+    + "The scope used depends on the implementation.")
+public abstract class AbstractListProcessor<T extends ListableEntity> extends 
AbstractProcessor {
+
+    public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new 
PropertyDescriptor.Builder()
+        .name("Distributed Cache Service")
+        .description("Specifies the Controller Service that should be used to 
maintain state about what has been pulled from the remote server so that if a 
new node "
+            + "begins pulling data, it won't duplicate all of the work that 
has been done. If not specified, the information will not be shared across the 
cluster. "
+            + "This property does not need to be set for standalone instances 
of NiFi but should be configured if NiFi is run within a cluster.")
+        .required(false)
+        .identifiesControllerService(DistributedMapCacheClient.class)
+        .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("All FlowFiles that are received are routed to success")
+        .build();
+
+    private volatile Long lastListingTime = null;
+    private volatile Long lastProcessedTime = 0L;
+    private volatile Long lastRunTime = 0L;
+    private volatile boolean justElectedPrimaryNode = false;
+    private volatile boolean resetState = false;
+
+    /*
+     * A constant used in determining an internal "yield" of processing files. 
Given the logic to provide a pause on the newest
+     * files according to timestamp, it is ensured that at least the specified 
millis has been eclipsed to avoid getting scheduled
+     * near instantaneously after the prior iteration effectively voiding the 
built in buffer
+     */
+    public static final long LISTING_LAG_NANOS = 
TimeUnit.MILLISECONDS.toNanos(100L);
+    static final String LISTING_TIMESTAMP_KEY = "listing.timestamp";
+    static final String PROCESSED_TIMESTAMP_KEY = "processed.timestamp";
+
+    public File getPersistenceFile() {
+        return new File("conf/state/" + getIdentifier());
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(DISTRIBUTED_CACHE_SERVICE);
+        return properties;
+    }
+
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
+        if (isConfigurationRestored() && isListingResetNecessary(descriptor)) {
+            resetTimeStates(); // clear lastListingTime so that we have to 
fetch new time
+            resetState = true;
+        }
+    }
+
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        return relationships;
+    }
+
+    @OnPrimaryNodeStateChange
+    public void onPrimaryNodeChange(final PrimaryNodeState newState) {
+        justElectedPrimaryNode = (newState == 
PrimaryNodeState.ELECTED_PRIMARY_NODE);
+    }
+
+    @OnScheduled
+    public final void updateState(final ProcessContext context) throws 
IOException {
+        final String path = getPath(context);
+        final DistributedMapCacheClient client = 
context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
+
+        // Check if state already exists for this path. If so, we have already 
migrated the state.
+        final StateMap stateMap = 
context.getStateManager().getState(getStateScope(context));
+        if (stateMap.getVersion() == -1L) {
+            try {
+                // Migrate state from the old way of managing state 
(distributed cache service and local file)
+                // to the new mechanism (State Manager).
+                migrateState(path, client, context.getStateManager(), 
getStateScope(context));
+            } catch (final IOException ioe) {
+                throw new IOException("Failed to properly migrate state to 
State Manager", ioe);
+            }
+        }
+
+        // When scheduled to run, check if the associated timestamp is null, 
signifying a clearing of state and reset the internal timestamp
+        if (lastListingTime != null && stateMap.get(LISTING_TIMESTAMP_KEY) == 
null) {
+            getLogger().info("Detected that state was cleared for this 
component.  Resetting internal values.");
+            resetTimeStates();
+        }
+
+        if (resetState) {
+            context.getStateManager().clear(getStateScope(context));
+            resetState = false;
+        }
+    }
+
+    /**
+     * This processor used to use the DistributedMapCacheClient in order to 
store cluster-wide state, before the introduction of
+     * the StateManager. This method will migrate state from that 
DistributedMapCacheClient, or from a local file, to the StateManager,
+     * if any state already exists. More specifically, this will extract out 
the relevant timestamp for when the processor last ran
+     *
+     * @param path         the path to migrate state for
+     * @param client       the DistributedMapCacheClient that is capable of 
obtaining the current state
+     * @param stateManager the StateManager to use in order to store the new 
state
+     * @param scope        the scope to use
+     * @throws IOException if unable to retrieve or store the state
+     */
+    private void migrateState(final String path, final 
DistributedMapCacheClient client, final StateManager stateManager, final Scope 
scope) throws IOException {
+        Long minTimestamp = null;
+
+        // Retrieve state from Distributed Cache Client, establishing the 
latest file seen
+        if (client != null) {
+            final StringSerDe serde = new StringSerDe();
+            final String serializedState = client.get(getKey(path), serde, 
serde);
+            if (serializedState != null && !serializedState.isEmpty()) {
+                final EntityListing listing = deserialize(serializedState);
+                minTimestamp = listing.getLatestTimestamp().getTime();
+            }
+
+            // remove entry from distributed cache server
+            if (client != null) {
+                try {
+                    client.remove(path, new StringSerDe());
+                } catch (final IOException ioe) {
+                    getLogger().warn("Failed to remove entry from Distributed 
Cache Service. However, the state has already been migrated to use the new "
+                        + "State Management service, so the Distributed Cache 
Service is no longer needed.");
+                }
+            }
+        }
+
+        // Retrieve state from locally persisted file, and compare these to 
the minTimestamp established from the distributedCache, if there was one
+        final File persistenceFile = getPersistenceFile();
+        if (persistenceFile.exists()) {
+            final Properties props = new Properties();
+
+            try (final FileInputStream fis = new 
FileInputStream(persistenceFile)) {
+                props.load(fis);
+            }
+
+            final String locallyPersistedValue = props.getProperty(path);
+            if (locallyPersistedValue != null) {
+                final EntityListing listing = 
deserialize(locallyPersistedValue);
+                final long localTimestamp = 
listing.getLatestTimestamp().getTime();
+                // if the local file's latest timestamp is beyond that of the 
value provided from the cache, replace
+                if (minTimestamp == null || localTimestamp > minTimestamp) {
+                    minTimestamp = localTimestamp;
+                }
+            }
+
+            // delete the local file, since it is no longer needed
+            if (persistenceFile.exists() && !persistenceFile.delete()) {
+                getLogger().warn("Migrated state but failed to delete local 
persistence file");
+            }
+        }
+
+        if (minTimestamp != null) {
+            persist(minTimestamp, minTimestamp, stateManager, scope);
+        }
+    }
+
+    private void persist(final long listingTimestamp, final long 
processedTimestamp, final StateManager stateManager, final Scope scope) throws 
IOException {
+        final Map<String, String> updatedState = new HashMap<>(1);
+        updatedState.put(LISTING_TIMESTAMP_KEY, 
String.valueOf(listingTimestamp));
+        updatedState.put(PROCESSED_TIMESTAMP_KEY, 
String.valueOf(processedTimestamp));
+        stateManager.setState(updatedState, scope);
+    }
+
+    protected String getKey(final String directory) {
+        return getIdentifier() + ".lastListingTime." + directory;
+    }
+
+    private EntityListing deserialize(final String serializedState) throws 
JsonParseException, JsonMappingException, IOException {
+        final ObjectMapper mapper = new ObjectMapper();
+        final JsonNode jsonNode = mapper.readTree(serializedState);
+        return mapper.readValue(jsonNode, EntityListing.class);
+    }
+
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        Long minTimestamp = lastListingTime;
+
+        if (this.lastListingTime == null || this.lastProcessedTime == null || 
justElectedPrimaryNode) {
+            try {
+                // Attempt to retrieve state from the state manager if a last 
listing was not yet established or
+                // if just elected the primary node
+                final StateMap stateMap = 
context.getStateManager().getState(getStateScope(context));
+                final String listingTimestampString = 
stateMap.get(LISTING_TIMESTAMP_KEY);
+                final String lastProcessedString= 
stateMap.get(PROCESSED_TIMESTAMP_KEY);
+                if (lastProcessedString != null) {
+                    this.lastProcessedTime = 
Long.parseLong(lastProcessedString);
+                }
+                if (listingTimestampString != null) {
+                    minTimestamp = Long.parseLong(listingTimestampString);
+                    // If our determined timestamp is the same as that of our 
last listing, skip this execution as there are no updates
+                    if (minTimestamp == this.lastListingTime) {
+                        context.yield();
+                        return;
+                    } else {
+                        this.lastListingTime = minTimestamp;
+                    }
+                }
+                justElectedPrimaryNode = false;
+            } catch (final IOException ioe) {
+                getLogger().error("Failed to retrieve timestamp of last 
listing from the State Manager. Will not perform listing until this is 
accomplished.");
+                context.yield();
+                return;
+            }
+        }
+
+        final List<T> entityList;
+        final long currentListingTimestamp = System.nanoTime();
+        try {
+            // track of when this last executed for consideration of the lag 
nanos
+            entityList = performListing(context, minTimestamp);
+        } catch (final IOException e) {
+            getLogger().error("Failed to perform listing on remote host due to 
{}", e);
+            context.yield();
+            return;
+        }
+
+        if (entityList == null || entityList.isEmpty()) {
+            context.yield();
+            return;
+        }
+
+        Long latestListingTimestamp = null;
+        final TreeMap<Long, List<T>> orderedEntries = new TreeMap<>();
+
+        // Build a sorted map to determine the latest possible entries
+        for (final T entity : entityList) {
+            final long entityTimestamp = entity.getTimestamp();
+            // New entries are all those that occur at or after the associated 
timestamp
+            final boolean newEntry = minTimestamp == null || entityTimestamp 
>= minTimestamp && entityTimestamp > lastProcessedTime;
+
+            if (newEntry) {
+                List<T> entitiesForTimestamp = 
orderedEntries.get(entity.getTimestamp());
+                if (entitiesForTimestamp == null) {
+                    entitiesForTimestamp = new ArrayList<T>();
+                    orderedEntries.put(entity.getTimestamp(), 
entitiesForTimestamp);
+                }
+                entitiesForTimestamp.add(entity);
+            }
+        }
+
+        int flowfilesCreated = 0;
+
+        if (orderedEntries.size() > 0) {
+            latestListingTimestamp = orderedEntries.lastKey();
+
+            // If the last listing time is equal to the newest entries 
previously seen,
+            // another iteration has occurred without new files and special 
handling is needed to avoid starvation
+            if (latestListingTimestamp.equals(lastListingTime)) {
+                /* We are done when either:
+                 *   - the latest listing timestamp is If we have not eclipsed 
the minimal listing lag needed due to being triggered too soon after the last 
run
+                 *   - the latest listing timestamp is equal to the last 
processed time, meaning we handled those items originally passed over
+                 */
+                if (System.nanoTime() - lastRunTime < LISTING_LAG_NANOS || 
latestListingTimestamp.equals(lastProcessedTime)) {
+                    context.yield();
+                    return;
+                }
+
+            } else if (latestListingTimestamp >= currentListingTimestamp - 
LISTING_LAG_NANOS) {
+                // Otherwise, newest entries are held back one cycle to avoid 
issues in writes occurring exactly when the listing is being performed to avoid 
missing data
+                orderedEntries.remove(latestListingTimestamp);
+            }
+
+            for (List<T> timestampEntities : orderedEntries.values()) {
+                for (T entity : timestampEntities) {
+                    // Create the FlowFile for this path.
+                    final Map<String, String> attributes = 
createAttributes(entity, context);
+                    FlowFile flowFile = session.create();
+                    flowFile = session.putAllAttributes(flowFile, attributes);
+                    session.transfer(flowFile, REL_SUCCESS);
+                    flowfilesCreated++;
+                }
+            }
+        }
+
+        // As long as we have a listing timestamp, there is meaningful state 
to capture regardless of any outputs generated
+        if (latestListingTimestamp != null) {
+            boolean processedNewFiles = flowfilesCreated > 0;
+            if (processedNewFiles) {
+                // If there have been files created, update the last timestamp 
we processed
+                lastProcessedTime = orderedEntries.lastKey();
+                getLogger().info("Successfully created listing with {} new 
objects", new Object[]{flowfilesCreated});
+                session.commit();
+            }
+
+            lastRunTime = System.nanoTime();
+
+            if (!latestListingTimestamp.equals(lastListingTime) || 
processedNewFiles) {
+                // We have performed a listing and pushed any FlowFiles out 
that may have been generated
+                // Now, we need to persist state about the Last Modified 
timestamp of the newest file
+                // that we evaluated. We do this in order to avoid pulling in 
the same file twice.
+                // However, we want to save the state both locally and 
remotely.
+                // We store the state remotely so that if a new Primary Node 
is chosen, it can pick up where the
+                // previously Primary Node left off.
+                // We also store the state locally so that if the node is 
restarted, and the node cannot contact
+                // the distributed state cache, the node can continue to run 
(if it is primary node).
+                try {
+                    lastListingTime = latestListingTimestamp;
+                    persist(latestListingTimestamp, lastProcessedTime, 
context.getStateManager(), getStateScope(context));
+                } catch (final IOException ioe) {
+                    getLogger().warn("Unable to save state due to {}. If NiFi 
is restarted before state is saved, or "
+                        + "if another node begins executing this Processor, 
data duplication may occur.", ioe);
+                }
+            }
+
+        } else {
+            getLogger().debug("There is no data to list. Yielding.");
+            context.yield();
+
+            // lastListingTime = 0 so that we don't continually poll the 
distributed cache / local file system
+            if (lastListingTime == null) {
+                lastListingTime = 0L;
+            }
+
+            return;
+        }
+    }
+
+    private void resetTimeStates() {
+        lastListingTime = null;
+        lastProcessedTime = 0L;
+        lastRunTime = 0L;
+    }
+
+    /**
+     * Creates a Map of attributes that should be applied to the FlowFile to 
represent this entity. This processor will emit a FlowFile for each "new" entity
+     * (see the documentation for this class for a discussion of how this 
class determines whether or not an entity is "new"). The FlowFile will contain 
no
+     * content. The attributes that will be included are exactly the 
attributes that are returned by this method.
+     *
+     * @param entity  the entity represented by the FlowFile
+     * @param context the ProcessContext for obtaining configuration 
information
+     * @return a Map of attributes for this entity
+     */
+    protected abstract Map<String, String> createAttributes(T entity, 
ProcessContext context);
+
+    /**
+     * Returns the path to perform a listing on.
+     * Many resources can be comprised of a "path" (or a "container" or 
"bucket", etc.) as well as name or identifier that is unique only
+     * within that path. This method is responsible for returning the path 
that is currently being polled for entities. If this does concept
+     * does not apply for the concrete implementation, it is recommended that 
the concrete implementation return "." or "/" for all invocations of this 
method.
+     *
+     * @param context the ProcessContex to use in order to obtain configuration
+     * @return the path that is to be used to perform the listing, or 
<code>null</code> if not applicable.
+     */
+    protected abstract String getPath(final ProcessContext context);
+
+    /**
+     * Performs a listing of the remote entities that can be pulled. If any 
entity that is returned has already been "discovered" or "emitted"
+     * by this Processor, it will be ignored. A discussion of how the 
Processor determines those entities that have already been emitted is
+     * provided above in the documentation for this class. Any entity that is 
returned by this method with a timestamp prior to the minTimestamp
+     * will be filtered out by the Processor. Therefore, it is not necessary 
that implementations perform this filtering but can be more efficient
+     * if the filtering can be performed on the server side prior to 
retrieving the information.
+     *
+     * @param context      the ProcessContex to use in order to pull the 
appropriate entities
+     * @param minTimestamp the minimum timestamp of entities that should be 
returned.
+     * @return a Listing of entities that have a timestamp >= minTimestamp
+     */
+    protected abstract List<T> performListing(final ProcessContext context, 
final Long minTimestamp) throws IOException;
+
+    /**
+     * Determines whether or not the listing must be reset if the value of the 
given property is changed
+     *
+     * @param property the property that has changed
+     * @return <code>true</code> if a change in value of the given property 
necessitates that the listing be reset, <code>false</code> otherwise.
+     */
+    protected abstract boolean isListingResetNecessary(final 
PropertyDescriptor property);
+
+    /**
+     * Returns a Scope that specifies where the state should be managed for 
this Processor
+     *
+     * @param context the ProcessContext to use in order to make a 
determination
+     * @return a Scope that specifies where the state should be managed for 
this Processor
+     */
+    protected abstract Scope getStateScope(final ProcessContext context);
+
+
+    private static class StringSerDe implements Serializer<String>, 
Deserializer<String> {
+        @Override
+        public String deserialize(final byte[] value) throws 
DeserializationException, IOException {
+            if (value == null) {
+                return null;
+            }
+
+            return new String(value, StandardCharsets.UTF_8);
+        }
+
+        @Override
+        public void serialize(final String value, final OutputStream out) 
throws SerializationException, IOException {
+            out.write(value.getBytes(StandardCharsets.UTF_8));
+        }
+    }
+}

Reply via email to