Repository: nifi
Updated Branches:
  refs/heads/master d01449ee7 -> 1a512cd1e


NIFI-1484 Making use of timestamps at various points of execution to provide 
listing of all but the latest files which are held until a subsequent execution.

Correcting nifi-amqp-nar bundle's pom description.

This closes #212.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1a512cd1
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1a512cd1
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1a512cd1

Branch: refs/heads/master
Commit: 1a512cd1e67e6d0231e9dcde9d32472fad4c5bd2
Parents: d01449e
Author: Aldrin Piri <ald...@apache.org>
Authored: Tue Feb 9 15:03:58 2016 -0500
Committer: Aldrin Piri <ald...@apache.org>
Committed: Wed Feb 10 10:25:47 2016 -0500

----------------------------------------------------------------------
 nifi-nar-bundles/nifi-amqp-bundle/pom.xml       |   2 +-
 .../standard/AbstractListProcessor.java         | 312 ++++++++++---------
 .../nifi/processors/standard/ListFile.java      |   4 +-
 .../nifi/processors/standard/ListSFTP.java      |   4 +-
 .../standard/TestAbstractListProcessor.java     | 219 ++++++++++---
 .../nifi/processors/standard/TestListFile.java  | 164 ++++++++--
 6 files changed, 498 insertions(+), 207 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/1a512cd1/nifi-nar-bundles/nifi-amqp-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/pom.xml 
b/nifi-nar-bundles/nifi-amqp-bundle/pom.xml
index 1ab9109..fff9905 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-amqp-bundle/pom.xml
@@ -23,7 +23,7 @@
     <artifactId>nifi-amqp-bundle</artifactId>
     <version>0.5.0-SNAPSHOT</version>
     <packaging>pom</packaging>
-    <description>A bundle of processors that run Flume 
sources/sinks</description>
+    <description>A bundle of processors that publish to and consume messages 
from AMQP.</description>
     <modules>
         <module>nifi-amqp-processors</module>
         <module>nifi-amqp-nar</module>

http://git-wip-us.apache.org/repos/asf/nifi/blob/1a512cd1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
index b04deb3..08b8b28 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
@@ -23,13 +23,13 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.TreeMap;
 
 import org.apache.nifi.annotation.behavior.Stateful;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
@@ -64,59 +64,44 @@ import org.codehaus.jackson.map.ObjectMapper;
  * Those remote resources may be files, "objects", "messages", or any other 
sort of entity that may need to be listed in such a way that
  * we identity the entity only once. Each of these objects, messages, etc. is 
referred to as an "entity" for the scope of this Processor.
  * </p>
- *
  * <p>
  * This class is responsible for triggering the listing to occur, filtering 
the results returned such that only new (unlisted) entities
  * or entities that have been modified will be emitted from the Processor.
  * </p>
- *
  * <p>
  * In order to make use of this abstract class, the entities listed must meet 
the following criteria:
  * </p>
- *
  * <ul>
- *  <li>
- *      Entity must have a timestamp associated with it. This timestamp is 
used to determine if entities are "new" or not. Any entity that is
- *      returned by the listing will be considered "new" if the timestamp is 
later than the latest timestamp pulled.
- *  </li>
- *  <li>
- *      Entity must have a unique identifier. This is used in conjunction with 
the timestamp in order to determine whether or not the entity is
- *      new. If the timestamp of an entity is before the latest timestamp 
pulled, then the entity is not considered new. If the timestamp is later
- *      than the last timestamp pulled, then the entity is considered new. If 
the timestamp is equal to the latest timestamp pulled, then the entity's
- *      identifier is compared to all of the entity identifiers that have that 
same timestamp in order to determine whether or not the entity has been
- *      seen already.
- *  </li>
- *  <li>
- *      Entity must have a user-readable name that can be used for logging 
purposes.
- *  </li>
+ * <li>
+ * Entity must have a timestamp associated with it. This timestamp is used to 
determine if entities are "new" or not. Any entity that is
+ * returned by the listing will be considered "new" if the timestamp is later 
than the latest timestamp pulled.
+ * </li>
+ * <li>
+ * If the timestamp of an entity is before OR equal to the latest timestamp 
pulled, then the entity is not considered new. If the timestamp is later
+ * than the last timestamp pulled, then the entity is considered new.
+ * </li>
+ * <li>
+ * Entity must have a user-readable name that can be used for logging purposes.
+ * </li>
  * </ul>
- *
  * <p>
- * This class persists state across restarts so that even if NiFi is 
restarted, duplicates will not be pulled from the remote system. This is 
performed using
- * two different mechanisms. First, state is stored locally. This allows the 
system to be restarted and begin processing where it left off. The state that is
- * stored is the latest timestamp that has been pulled (as determined by the 
timestamps of the entities that are returned), as well as the unique identifier 
of
- * each entity that has that timestamp. See the section above for information 
about how these pieces of information are used in order to determine entity 
uniqueness.
+ * This class persists state across restarts so that even if NiFi is 
restarted, duplicates will not be pulled from the target system given the above 
criteria. This is
+ * performed using the {@link StateManager}. This allows the system to be 
restarted and begin processing where it left off. The state that is stored is 
the latest timestamp
+ * that has been pulled (as determined by the timestamps of the entities that 
are returned). See the section above for information about how this information 
isused in order to
+ * determine new entities.
  * </p>
- *
  * <p>
- * In addition to storing state locally, the Processor exposes an optional 
<code>Distributed Cache Service</code> property. In standalone deployment of 
NiFi, this is
- * not necessary. However, in a clustered environment, subclasses of this 
class are expected to be run only on primary node. While this means that the 
local state is
- * accurate as long as the primary node remains constant, the primary node in 
the cluster can be changed. As a result, if running in a clustered environment, 
it is
- * recommended that this property be set. This allows the same state that is 
described above to also be replicated across the cluster. If this property is 
set, then
- * on restart the Processor will not begin listing until it has retrieved an 
updated state from this service, as it does not know whether or not another 
node has
- * modified the state in the mean time.
+ * NOTE: This processor performs migrations of legacy state mechanisms 
inclusive of locally stored, file-based state and the optional utilization of 
the <code>Distributed Cache
+ * Service</code> property to the new {@link StateManager} functionality. Upon 
successful migration, the associated data from one or both of the legacy 
mechanisms is purged.
  * </p>
- *
  * <p>
  * For each new entity that is listed, the Processor will send a FlowFile to 
the 'success' relationship. The FlowFile will have no content but will have 
some set
  * of attributes (defined by the concrete implementation) that can be used to 
fetch those remote resources or interact with them in whatever way makes sense 
for
  * the configured dataflow.
  * </p>
- *
  * <p>
  * Subclasses are responsible for the following:
  * </p>
- *
  * <ul>
  * <li>
  * Perform a listing of remote resources. The subclass will implement the 
{@link #performListing(ProcessContext, Long)} method, which creates a listing 
of all
@@ -141,9 +126,10 @@ import org.codehaus.jackson.map.ObjectMapper;
  * </ul>
  */
 @TriggerSerially
-@Stateful(scopes={Scope.LOCAL, Scope.CLUSTER}, description="After a listing of 
resources is performed, the latest timestamp of any of the resources is stored 
in the component's state, "
-    + "along with all resources that have that same timestmap so that the 
Processor can avoid data duplication. The scope used depends on the 
implementation.")
+@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "After a 
listing of resources is performed, the latest timestamp of any of the resources 
is stored in the component's state. "
+    + "The scope used depends on the implementation.")
 public abstract class AbstractListProcessor<T extends ListableEntity> extends 
AbstractProcessor {
+
     public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new 
PropertyDescriptor.Builder()
         .name("Distributed Cache Service")
         .description("Specifies the Controller Service that should be used to 
maintain state about what has been pulled from the remote server so that if a 
new node "
@@ -153,21 +139,25 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
         .identifiesControllerService(DistributedMapCacheClient.class)
         .build();
 
-
-
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
         .name("success")
         .description("All FlowFiles that are received are routed to success")
         .build();
 
-
-    private volatile Set<String> latestIdentifiersListed = new HashSet<>();
     private volatile Long lastListingTime = null;
+    private volatile Long lastProcessedTime = 0L;
+    private volatile Long lastRunTime = 0L;
     private volatile boolean justElectedPrimaryNode = false;
-    private volatile boolean resetListing = false;
+    private volatile boolean resetState = false;
 
-    static final String TIMESTAMP = "timestamp";
-    static final String IDENTIFIER_PREFIX = "id";
+    /*
+     * A constant used in determining an internal "yield" of processing files. 
Given the logic to provide a pause on the newest
+     * files according to timestamp, it is ensured that at least the specified 
millis has been eclipsed to avoid getting scheduled
+     * near instantaneously after the prior iteration effectively voiding the 
built in buffer
+     */
+    static final long LISTING_LAG_MILLIS = 100L;
+    static final String LISTING_TIMESTAMP_KEY = "listing.timestamp";
+    static final String PROCESSED_TIMESTAMP_KEY = "processed.timestamp";
 
     protected File getPersistenceFile() {
         return new File("conf/state/" + getIdentifier());
@@ -183,12 +173,12 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
     @Override
     public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
         if (isConfigurationRestored() && isListingResetNecessary(descriptor)) {
-            lastListingTime = null; // clear lastListingTime so that we have 
to fetch new time
-            latestIdentifiersListed = new HashSet<>();
-            resetListing = true;
+            resetTimeStates(); // clear lastListingTime so that we have to 
fetch new time
+            resetState = true;
         }
     }
 
+
     @Override
     public Set<Relationship> getRelationships() {
         final Set<Relationship> relationships = new HashSet<>();
@@ -218,49 +208,53 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
             }
         }
 
-        // remove entry from Distributed cache server
-        if (client != null) {
-            try {
-                client.remove(path, new StringSerDe());
-            } catch (final IOException ioe) {
-                getLogger().warn("Failed to remove entry from Distributed 
Cache Service. However, the state has already been migrated to use the new "
-                    + "State Management service, so the Distributed Cache 
Service is no longer needed.");
-            }
+        // When scheduled to run, check if the associated timestamp is null, 
signifying a clearing of state and reset the internal timestamp
+        if (lastListingTime != null && stateMap.get(LISTING_TIMESTAMP_KEY) == 
null) {
+            getLogger().info("Detected that state was cleared for this 
component.  Resetting internal values.");
+            resetTimeStates();
         }
 
-        if (resetListing) {
+        if (resetState) {
             context.getStateManager().clear(getStateScope(context));
-            resetListing = false;
+            resetState = false;
         }
     }
 
     /**
      * This processor used to use the DistributedMapCacheClient in order to 
store cluster-wide state, before the introduction of
      * the StateManager. This method will migrate state from that 
DistributedMapCacheClient, or from a local file, to the StateManager,
-     * if any state already exists
+     * if any state already exists. More specifically, this will extract out 
the relevant timestamp for when the processor last ran
      *
-     * @param path the path to migrate state for
-     * @param client the DistributedMapCacheClient that is capable of 
obtaining the current state
+     * @param path         the path to migrate state for
+     * @param client       the DistributedMapCacheClient that is capable of 
obtaining the current state
      * @param stateManager the StateManager to use in order to store the new 
state
-     * @param scope the scope to use
+     * @param scope        the scope to use
      * @throws IOException if unable to retrieve or store the state
      */
     private void migrateState(final String path, final 
DistributedMapCacheClient client, final StateManager stateManager, final Scope 
scope) throws IOException {
         Long minTimestamp = null;
-        final Set<String> latestIdentifiersListed = new HashSet<>();
 
-        // Retrieve state from Distributed Cache Client
+        // Retrieve state from Distributed Cache Client, establishing the 
latest file seen
         if (client != null) {
             final StringSerDe serde = new StringSerDe();
             final String serializedState = client.get(getKey(path), serde, 
serde);
             if (serializedState != null && !serializedState.isEmpty()) {
                 final EntityListing listing = deserialize(serializedState);
                 minTimestamp = listing.getLatestTimestamp().getTime();
-                
latestIdentifiersListed.addAll(listing.getMatchingIdentifiers());
+            }
+
+            // remove entry from distributed cache server
+            if (client != null) {
+                try {
+                    client.remove(path, new StringSerDe());
+                } catch (final IOException ioe) {
+                    getLogger().warn("Failed to remove entry from Distributed 
Cache Service. However, the state has already been migrated to use the new "
+                        + "State Management service, so the Distributed Cache 
Service is no longer needed.");
+                }
             }
         }
 
-        // Retrieve state from locally persisted file
+        // Retrieve state from locally persisted file, and compare these to 
the minTimestamp established from the distributedCache, if there was one
         final File persistenceFile = getPersistenceFile();
         if (persistenceFile.exists()) {
             final Properties props = new Properties();
@@ -273,10 +267,9 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
             if (locallyPersistedValue != null) {
                 final EntityListing listing = 
deserialize(locallyPersistedValue);
                 final long localTimestamp = 
listing.getLatestTimestamp().getTime();
+                // if the local file's latest timestamp is beyond that of the 
value provided from the cache, replace
                 if (minTimestamp == null || localTimestamp > minTimestamp) {
                     minTimestamp = localTimestamp;
-                    latestIdentifiersListed.clear();
-                    
latestIdentifiersListed.addAll(listing.getMatchingIdentifiers());
                 }
             }
 
@@ -287,18 +280,14 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
         }
 
         if (minTimestamp != null) {
-            persist(minTimestamp, latestIdentifiersListed, stateManager, 
scope);
+            persist(minTimestamp, minTimestamp, stateManager, scope);
         }
     }
 
-    private void persist(final long timestamp, final Collection<String> 
identifiers, final StateManager stateManager, final Scope scope) throws 
IOException {
-        final Map<String, String> updatedState = new 
HashMap<>(identifiers.size() + 1);
-        updatedState.put(TIMESTAMP, String.valueOf(timestamp));
-        int counter = 0;
-        for (final String identifier : identifiers) {
-            final String index = String.valueOf(++counter);
-            updatedState.put(IDENTIFIER_PREFIX + "." + index, identifier);
-        }
+    private void persist(final long listingTimestamp, final long 
processedTimestamp, final StateManager stateManager, final Scope scope) throws 
IOException {
+        final Map<String, String> updatedState = new HashMap<>(1);
+        updatedState.put(LISTING_TIMESTAMP_KEY, 
String.valueOf(listingTimestamp));
+        updatedState.put(PROCESSED_TIMESTAMP_KEY, 
String.valueOf(processedTimestamp));
         stateManager.setState(updatedState, scope);
     }
 
@@ -316,41 +305,38 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
         Long minTimestamp = lastListingTime;
-        try {
-            // We need to fetch the state from the cluster if we don't yet 
know the last listing time,
-            // or if we were just elected the primary node
-            if (this.lastListingTime == null || justElectedPrimaryNode) {
+
+        if (this.lastListingTime == null || this.lastProcessedTime == null || 
justElectedPrimaryNode) {
+            try {
+                // Attempt to retrieve state from the state manager if a last 
listing was not yet established or
+                // if just elected the primary node
                 final StateMap stateMap = 
context.getStateManager().getState(getStateScope(context));
-                final Map<String, String> stateValues = stateMap.toMap();
-                final String timestamp = stateValues.get(TIMESTAMP);
-
-                if (timestamp == null) {
-                    minTimestamp = 0L;
-                    latestIdentifiersListed.clear();
-                } else {
-                    minTimestamp = this.lastListingTime = 
Long.parseLong(timestamp);
-                    latestIdentifiersListed.clear();
-                    for (final Map.Entry<String, String> entry : 
stateValues.entrySet()) {
-                        final String key = entry.getKey();
-                        final String value = entry.getValue();
-                        if (TIMESTAMP.equals(key)) {
-                            continue;
-                        }
-
-                        latestIdentifiersListed.add(value);
+                final String listingTimestampString = 
stateMap.get(LISTING_TIMESTAMP_KEY);
+                final String lastProcessedString= 
stateMap.get(PROCESSED_TIMESTAMP_KEY);
+                if (lastProcessedString != null) {
+                    this.lastProcessedTime = 
Long.parseLong(lastProcessedString);
+                }
+                if (listingTimestampString != null) {
+                    minTimestamp = Long.parseLong(listingTimestampString);
+                    // If our determined timestamp is the same as that of our 
last listing, skip this execution as there are no updates
+                    if (minTimestamp == this.lastListingTime) {
+                        context.yield();
+                        return;
+                    } else {
+                        this.lastListingTime = minTimestamp;
                     }
                 }
-
                 justElectedPrimaryNode = false;
+            } catch (final IOException ioe) {
+                getLogger().error("Failed to retrieve timestamp of last 
listing from the State Manager. Will not perform listing until this is 
accomplished.");
+                context.yield();
+                return;
             }
-        } catch (final IOException ioe) {
-            getLogger().error("Failed to retrieve timestamp of last listing 
from the State Manager. Will not perform listing until this is accomplished.");
-            context.yield();
-            return;
         }
 
         final List<T> entityList;
         try {
+            // track of when this last executed for consideration of the lag 
millis
             entityList = performListing(context, minTimestamp);
         } catch (final IOException e) {
             getLogger().error("Failed to perform listing on remote host due to 
{}", e);
@@ -358,65 +344,93 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
             return;
         }
 
-        if (entityList == null) {
+        if (entityList == null || entityList.isEmpty()) {
             context.yield();
             return;
         }
 
         Long latestListingTimestamp = null;
-        final List<T> newEntries = new ArrayList<>();
+        final TreeMap<Long, List<T>> orderedEntries = new TreeMap<>();
+
+        // Build a sorted map to determine the latest possible entries
         for (final T entity : entityList) {
-            final boolean newTimestamp = minTimestamp == null || 
entity.getTimestamp() > minTimestamp;
-            final boolean newEntryForTimestamp = minTimestamp != null && 
entity.getTimestamp() == minTimestamp && 
!latestIdentifiersListed.contains(entity.getIdentifier());
-            final boolean list = newTimestamp || newEntryForTimestamp;
-
-            // Create the FlowFile for this path.
-            if (list) {
-                final Map<String, String> attributes = 
createAttributes(entity, context);
-                FlowFile flowFile = session.create();
-                flowFile = session.putAllAttributes(flowFile, attributes);
-                session.transfer(flowFile, REL_SUCCESS);
-
-                // If we don't have a new timestamp but just have a new entry, 
we need to
-                // add all of the previous entries to our entityList. If we 
have a new timestamp,
-                // then the previous entries can go away.
-                if (!newTimestamp) {
-                    newEntries.addAll(entityList);
+            final long entityTimestamp = entity.getTimestamp();
+            // New entries are all those that occur at or after the associated 
timestamp
+            final boolean newEntry = minTimestamp == null || entityTimestamp 
>= minTimestamp && entityTimestamp > lastProcessedTime;
+
+            if (newEntry) {
+                List<T> entitiesForTimestamp = 
orderedEntries.get(entity.getTimestamp());
+                if (entitiesForTimestamp == null) {
+                    entitiesForTimestamp = new ArrayList<T>();
+                    orderedEntries.put(entity.getTimestamp(), 
entitiesForTimestamp);
+                }
+                entitiesForTimestamp.add(entity);
+            }
+        }
+
+        int flowfilesCreated = 0;
+
+        if (orderedEntries.size() > 0) {
+            latestListingTimestamp = orderedEntries.lastKey();
+
+            // If the last listing time is equal to the newest entries 
previously seen,
+            // another iteration has occurred without new files and special 
handling is needed to avoid starvation
+            if (latestListingTimestamp.equals(lastListingTime)) {
+                /* We are done when either:
+                 *   - the latest listing timestamp is If we have not eclipsed 
the minimal listing lag needed due to being triggered too soon after the last 
run
+                 *   - the latest listing timestamp is equal to the last 
processed time, meaning we handled those items originally passed over
+                 */
+                if (System.currentTimeMillis() - lastRunTime < 
LISTING_LAG_MILLIS || latestListingTimestamp.equals(lastProcessedTime)) {
+                    context.yield();
+                    return;
                 }
-                newEntries.add(entity);
+            } else {
+                // Otherwise, newest entries are held back one cycle to avoid 
issues in writes occurring exactly when the listing is being performed to avoid 
missing data
+                orderedEntries.remove(latestListingTimestamp);
+            }
 
-                if (latestListingTimestamp == null || entity.getTimestamp() > 
latestListingTimestamp) {
-                    latestListingTimestamp = entity.getTimestamp();
+            for (List<T> timestampEntities : orderedEntries.values()) {
+                for (T entity : timestampEntities) {
+                    // Create the FlowFile for this path.
+                    final Map<String, String> attributes = 
createAttributes(entity, context);
+                    FlowFile flowFile = session.create();
+                    flowFile = session.putAllAttributes(flowFile, attributes);
+                    session.transfer(flowFile, REL_SUCCESS);
+                    flowfilesCreated++;
                 }
             }
         }
 
-        final int listCount = newEntries.size();
-        if (listCount > 0) {
-            getLogger().info("Successfully created listing with {} new 
objects", new Object[] {listCount});
-            session.commit();
-
-            // We have performed a listing and pushed the FlowFiles out.
-            // Now, we need to persist state about the Last Modified timestamp 
of the newest file
-            // that we pulled in. We do this in order to avoid pulling in the 
same file twice.
-            // However, we want to save the state both locally and remotely.
-            // We store the state remotely so that if a new Primary Node is 
chosen, it can pick up where the
-            // previously Primary Node left off.
-            // We also store the state locally so that if the node is 
restarted, and the node cannot contact
-            // the distributed state cache, the node can continue to run (if 
it is primary node).
-            final Set<String> identifiers = new HashSet<>(newEntries.size());
-            try {
-                for (final T entity : newEntries) {
-                    identifiers.add(entity.getIdentifier());
+        // As long as we have a listing timestamp, there is meaningful state 
to capture regardless of any outputs generated
+        if (latestListingTimestamp != null) {
+            boolean processedNewFiles = flowfilesCreated > 0;
+            if (processedNewFiles) {
+                // If there have been files created, update the last timestamp 
we processed
+                lastProcessedTime = orderedEntries.lastKey();
+                getLogger().info("Successfully created listing with {} new 
objects", new Object[]{flowfilesCreated});
+                session.commit();
+            }
+
+            lastRunTime = System.currentTimeMillis();
+
+            if (!latestListingTimestamp.equals(lastListingTime) || 
processedNewFiles) {
+                // We have performed a listing and pushed any FlowFiles out 
that may have been generated
+                // Now, we need to persist state about the Last Modified 
timestamp of the newest file
+                // that we evaluated. We do this in order to avoid pulling in 
the same file twice.
+                // However, we want to save the state both locally and 
remotely.
+                // We store the state remotely so that if a new Primary Node 
is chosen, it can pick up where the
+                // previously Primary Node left off.
+                // We also store the state locally so that if the node is 
restarted, and the node cannot contact
+                // the distributed state cache, the node can continue to run 
(if it is primary node).
+                try {
+                    lastListingTime = latestListingTimestamp;
+                    persist(latestListingTimestamp, lastProcessedTime, 
context.getStateManager(), getStateScope(context));
+                } catch (final IOException ioe) {
+                    getLogger().warn("Unable to save state due to {}. If NiFi 
is restarted before state is saved, or "
+                        + "if another node begins executing this Processor, 
data duplication may occur.", ioe);
                 }
-                persist(latestListingTimestamp, identifiers, 
context.getStateManager(), getStateScope(context));
-            } catch (final IOException ioe) {
-                getLogger().warn("Unable to save state due to {}. If NiFi is 
restarted before state is saved, or "
-                    + "if another node begins executing this Processor, data 
duplication may occur.", ioe);
             }
 
-            lastListingTime = latestListingTimestamp;
-            latestIdentifiersListed = identifiers;
         } else {
             getLogger().debug("There is no data to list. Yielding.");
             context.yield();
@@ -430,13 +444,18 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
         }
     }
 
+    private void resetTimeStates() {
+        lastListingTime = null;
+        lastProcessedTime = 0L;
+        lastRunTime = 0L;
+    }
 
     /**
      * Creates a Map of attributes that should be applied to the FlowFile to 
represent this entity. This processor will emit a FlowFile for each "new" entity
      * (see the documentation for this class for a discussion of how this 
class determines whether or not an entity is "new"). The FlowFile will contain 
no
      * content. The attributes that will be included are exactly the 
attributes that are returned by this method.
      *
-     * @param entity the entity represented by the FlowFile
+     * @param entity  the entity represented by the FlowFile
      * @param context the ProcessContext for obtaining configuration 
information
      * @return a Map of attributes for this entity
      */
@@ -460,9 +479,8 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
      * will be filtered out by the Processor. Therefore, it is not necessary 
that implementations perform this filtering but can be more efficient
      * if the filtering can be performed on the server side prior to 
retrieving the information.
      *
-     * @param context the ProcessContex to use in order to pull the 
appropriate entities
+     * @param context      the ProcessContex to use in order to pull the 
appropriate entities
      * @param minTimestamp the minimum timestamp of entities that should be 
returned.
-     *
      * @return a Listing of entities that have a timestamp >= minTimestamp
      */
     protected abstract List<T> performListing(final ProcessContext context, 
final Long minTimestamp) throws IOException;

http://git-wip-us.apache.org/repos/asf/nifi/blob/1a512cd1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
index e82be2c..b64e80d 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
@@ -95,8 +95,8 @@ import org.apache.nifi.processors.standard.util.FileInfo;
                 "rw-rw-r--")
 })
 @SeeAlso({GetFile.class, PutFile.class, FetchFile.class})
-@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "After 
performing a listing of files, the timestamp of the newest file is stored, "
-    + "along with the filenames of all files that share that same timestamp. 
This allows the Processor to list only files that have been added or modified 
after "
+@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "After 
performing a listing of files, the timestamp of the newest file is stored. "
+    + "This allows the Processor to list only files that have been added or 
modified after "
     + "this date the next time that the Processor is run. Whether the state is 
stored with a Local or Cluster scope depends on the value of the "
     + "<Input Directory Location> property.")
 public class ListFile extends AbstractListProcessor<FileInfo> {

http://git-wip-us.apache.org/repos/asf/nifi/blob/1a512cd1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
index d92f398..f2df7da 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
@@ -51,8 +51,8 @@ import org.apache.nifi.processors.standard.util.SFTPTransfer;
     @WritesAttribute(attribute = "filename", description = "The name of the 
file on the SFTP Server"),
     @WritesAttribute(attribute = "path", description = "The fully qualified 
name of the directory on the SFTP Server from which the file was pulled"),
 })
-@Stateful(scopes = {Scope.CLUSTER}, description = "After performing a listing 
of files, the timestamp of the newest file is stored, "
-    + "along with the filename all files that share that same timestamp. This 
allows the Processor to list only files that have been added or modified after "
+@Stateful(scopes = {Scope.CLUSTER}, description = "After performing a listing 
of files, the timestamp of the newest file is stored. "
+    + "This allows the Processor to list only files that have been added or 
modified after "
     + "this date the next time that the Processor is run. State is stored 
across the cluster so that this Processor can be run on Primary Node only and 
if "
     + "a new Primary Node is selected, the new node will not duplicate the 
data that was listed by the previous Primary Node.")
 public class ListSFTP extends ListFileTransfer {

http://git-wip-us.apache.org/repos/asf/nifi/blob/1a512cd1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
index 7544eb8..a85ff2a 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
@@ -18,7 +18,6 @@
 package org.apache.nifi.processors.standard;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.FileOutputStream;
@@ -55,61 +54,127 @@ public class TestAbstractListProcessor {
     public final TemporaryFolder testFolder = new TemporaryFolder();
 
     @Test
-    public void testOnlyNewEntriesEmitted() {
+    public void testOnlyNewEntriesEmitted() throws Exception {
         final ConcreteListProcessor proc = new ConcreteListProcessor();
         final TestRunner runner = TestRunners.newTestRunner(proc);
         runner.run();
 
+        final long initialTimestamp = System.currentTimeMillis();
+
         
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-        proc.addEntity("name", "id", 1492L);
+        proc.addEntity("name", "id", initialTimestamp);
+        proc.addEntity("name", "id2", initialTimestamp);
         runner.run();
 
-        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
+        // First run, the above listed entries would be skipped to avoid write 
synchronization issues
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
         runner.clearTransferState();
 
-        proc.addEntity("name", "id2", 1492L);
+        // Ensure we have covered the necessary lag period to avoid issues 
where the processor was immediately scheduled to run again
+        Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
+
+        // Running again, our two previously seen files are now cleared to be 
released
         runner.run();
-        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
+        runner.clearTransferState();
+
+        // Verify no new old files show up
+        proc.addEntity("name", "id2", initialTimestamp);
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
         runner.clearTransferState();
 
-        proc.addEntity("name", "id2", 1492L);
+        proc.addEntity("name", "id3", initialTimestamp - 1);
         runner.run();
         
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
         runner.clearTransferState();
 
-        proc.addEntity("name", "id3", 1491L);
+        proc.addEntity("name", "id2", initialTimestamp);
         runner.run();
         
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
         runner.clearTransferState();
 
-        proc.addEntity("name", "id2", 1492L);
+        // Now a new file beyond the current time enters
+        proc.addEntity("name", "id2", initialTimestamp + 1);
+
+        // Nothing occurs for the first iteration as it is withheld
         runner.run();
         
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
         runner.clearTransferState();
 
-        proc.addEntity("name", "id2", 1493L);
+        Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
+
+        // But it should now show up that the appropriate pause has been 
eclipsed
         runner.run();
         
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
         runner.clearTransferState();
+    }
+
+    @Test
+    public void testHandleRestartWithEntriesAlreadyTransferredAndNoneNew() 
throws Exception {
+        final ConcreteListProcessor proc = new ConcreteListProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+
+        final long initialTimestamp = System.currentTimeMillis();
+
+        proc.addEntity("name", "id", initialTimestamp);
+        proc.addEntity("name", "id2", initialTimestamp);
+
+        // Emulate having state but not having had the processor run such as 
in a restart
+        final Map<String, String> preexistingState = new HashMap<>();
+        preexistingState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, 
Long.toString(initialTimestamp));
+        preexistingState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, 
Long.toString(initialTimestamp));
+        runner.getStateManager().setState(preexistingState, Scope.CLUSTER);
+
+        // run for the first time
+        runner.run();
+
+        // First run, the above listed entries would be skipped
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Ensure we have covered the necessary lag period to avoid issues 
where the processor was immediately scheduled to run again
+        Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
+
+        // Running again, these files should be eligible for transfer and 
again skipped
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Verify no new old files show up
+        proc.addEntity("name", "id2", initialTimestamp);
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        proc.addEntity("name", "id3", initialTimestamp - 1);
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
 
-        proc.addEntity("name", "id2", 1493L);
+        proc.addEntity("name", "id2", initialTimestamp);
         runner.run();
         
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
         runner.clearTransferState();
 
-        proc.addEntity("name", "id2", 1493L);
+        // Now a new file beyond the current time enters
+        proc.addEntity("name", "id2", initialTimestamp + 1);
+
+        // Nothing occurs for the first iteration as it is withheld
         runner.run();
         
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
         runner.clearTransferState();
 
-        proc.addEntity("name", "id", 1494L);
+        Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
+
+        // But it should now show up that the appropriate pause has been 
eclipsed
         runner.run();
         
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
         runner.clearTransferState();
     }
 
     @Test
-    public void testStateStoredInClusterStateManagement() throws 
InitializationException {
+    public void testStateStoredInClusterStateManagement() throws Exception {
         final ConcreteListProcessor proc = new ConcreteListProcessor();
         final TestRunner runner = TestRunners.newTestRunner(proc);
         final DistributedCache cache = new DistributedCache();
@@ -123,8 +188,17 @@ public class TestAbstractListProcessor {
         runner.run();
 
         final Map<String, String> expectedState = new HashMap<>();
-        expectedState.put(AbstractListProcessor.TIMESTAMP, "1492");
-        expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".1", 
"id");
+        // Ensure only timestamp is migrated
+        expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, "1492");
+        expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "0");
+        runner.getStateManager().assertStateEquals(expectedState, 
Scope.CLUSTER);
+
+        Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS);
+
+        runner.run();
+        // Ensure only timestamp is migrated
+        expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, "1492");
+        expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, 
"1492");
         runner.getStateManager().assertStateEquals(expectedState, 
Scope.CLUSTER);
     }
 
@@ -145,8 +219,9 @@ public class TestAbstractListProcessor {
 
         final MockStateManager stateManager = runner.getStateManager();
         final Map<String, String> expectedState = new HashMap<>();
-        expectedState.put(AbstractListProcessor.TIMESTAMP, "1492");
-        expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".1", 
"id");
+        // Ensure only timestamp is migrated
+        expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, "1492");
+        expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, 
"1492");
         stateManager.assertStateEquals(expectedState, Scope.CLUSTER);
     }
 
@@ -188,13 +263,59 @@ public class TestAbstractListProcessor {
 
         // Verify the state manager now maintains the associated state
         final Map<String, String> expectedState = new HashMap<>();
-        expectedState.put(AbstractListProcessor.TIMESTAMP, "1492");
-        expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".1", 
"id");
-
+        // Ensure only timestamp is migrated
+        expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, "1492");
+        expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, 
"1492");
         runner.getStateManager().assertStateEquals(expectedState, 
Scope.CLUSTER);
     }
 
     @Test
+    public void testResumeListingAfterClearingState() throws Exception {
+        final ConcreteListProcessor proc = new ConcreteListProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.run();
+
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+
+        final long initialEventTimestamp = System.currentTimeMillis();
+        proc.addEntity("name", "id", initialEventTimestamp);
+        proc.addEntity("name", "id2", initialEventTimestamp);
+
+        // Add entities but these should not be transferred as they are the 
latest values
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+
+        // after providing a pause in listings, the files should now  transfer
+        Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
+
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
+        runner.clearTransferState();
+
+        // Verify entities are not transferred again for the given state
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Clear state for this processor, eradicating timestamp
+        runner.getStateManager().clear(Scope.CLUSTER);
+        Assert.assertEquals("State is not empty for this component after 
clearing", 0, runner.getStateManager().getState(Scope.CLUSTER).toMap().size());
+
+
+        // As before, we are unsure of when these files were delivered 
relative to system time, and additional cycle(s) need to occur before transfer
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
+
+        // Ensure the original files are now transferred again.
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
+        runner.clearTransferState();
+    }
+
+    @Test
     public void testFetchOnStart() throws InitializationException {
         final ConcreteListProcessor proc = new ConcreteListProcessor();
         final TestRunner runner = TestRunners.newTestRunner(proc);
@@ -209,41 +330,65 @@ public class TestAbstractListProcessor {
     }
 
     @Test
-    public void testOnlyNewStateStored() throws IOException {
+    public void testOnlyNewStateStored() throws Exception {
         final ConcreteListProcessor proc = new ConcreteListProcessor();
         final TestRunner runner = TestRunners.newTestRunner(proc);
         runner.run();
 
+        final long initialTimestamp = System.currentTimeMillis();
+
         
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-        proc.addEntity("name", "id", 1492L);
-        proc.addEntity("name", "id2", 1492L);
+        proc.addEntity("name", "id", initialTimestamp);
+        proc.addEntity("name", "id2", initialTimestamp);
+
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
 
         runner.run();
         
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
         runner.clearTransferState();
 
         final StateMap stateMap = 
runner.getStateManager().getState(Scope.CLUSTER);
-        assertEquals(1, stateMap.getVersion());
+        assertEquals(2, stateMap.getVersion());
 
         final Map<String, String> map = stateMap.toMap();
-        assertEquals(3, map.size());
-        assertEquals("1492", map.get("timestamp"));
-        assertTrue(map.containsKey("id.1"));
-        assertTrue(map.containsKey("id.2"));
+        // Ensure only timestamp is migrated
+        assertEquals(2, map.size());
+        assertEquals(Long.toString(initialTimestamp), 
map.get(AbstractListProcessor.LISTING_TIMESTAMP_KEY));
+        assertEquals(Long.toString(initialTimestamp), 
map.get(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY));
 
-        proc.addEntity("new name", "new id", 1493L);
+        proc.addEntity("new name", "new id", initialTimestamp + 1);
         runner.run();
 
+        // Verify that the new entry has not been emitted but it has triggered 
an updated state
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        StateMap updatedStateMap = 
runner.getStateManager().getState(Scope.CLUSTER);
+        assertEquals(3, updatedStateMap.getVersion());
+
+        assertEquals(2, updatedStateMap.toMap().size());
+        assertEquals(Long.toString(initialTimestamp + 1), 
updatedStateMap.get(AbstractListProcessor.LISTING_TIMESTAMP_KEY));
+        // Processed timestamp is lagging behind currently
+        assertEquals(Long.toString(initialTimestamp), 
updatedStateMap.get(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY));
+
+        Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
+
+        runner.run();
         
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
-        final StateMap updatedStateMap = 
runner.getStateManager().getState(Scope.CLUSTER);
-        assertEquals(2, updatedStateMap.getVersion());
+        runner.clearTransferState();
 
-        final Map<String, String> updatedValues = updatedStateMap.toMap();
-        assertEquals(2, updatedValues.size());
-        assertEquals("1493", updatedValues.get("timestamp"));
-        assertEquals("new id", updatedValues.get("id.1"));
-    }
+        updatedStateMap = runner.getStateManager().getState(Scope.CLUSTER);
+        assertEquals(4, updatedStateMap.getVersion());
 
+        assertEquals(2, updatedStateMap.toMap().size());
+        assertEquals(Long.toString(initialTimestamp + 1), 
updatedStateMap.get(AbstractListProcessor.LISTING_TIMESTAMP_KEY));
+        // Processed timestamp is now caught up
+        assertEquals(Long.toString(initialTimestamp + 1), 
updatedStateMap.get(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY));
+    }
 
     private static class DistributedCache extends AbstractControllerService 
implements DistributedMapCacheClient {
         private final Map<Object, Object> stored = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/nifi/blob/1a512cd1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
index 93319d0..441a561 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
@@ -45,7 +45,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-public class TestListFile  {
+public class TestListFile {
 
     final String TESTDIR = "target/test/data/in";
     final File testDir = new File(TESTDIR);
@@ -112,9 +112,16 @@ public class TestListFile  {
         // process first file and set new timestamp
         runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
         runner.run();
+
+        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
+
+        runner.run();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
-        final List<MockFlowFile> successFiles = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
-        assertEquals(1, successFiles.size());
+        final List<MockFlowFile> successFiles1 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
+        assertEquals(1, successFiles1.size());
 
         // create second file
         final File file2 = new File(TESTDIR + "/listing2.txt");
@@ -124,6 +131,12 @@ public class TestListFile  {
         // process second file after timestamp
         runner.clearTransferState();
         runner.run();
+        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
+
+        runner.run();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles2 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         assertEquals(1, successFiles2.size());
@@ -147,11 +160,18 @@ public class TestListFile  {
         runner.run();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles4 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
-        assertEquals(3, successFiles4.size());
+        assertEquals(2, successFiles4.size());
+
+        runner.clearTransferState();
+
+        Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
+
+        runner.run();
+        runner.assertTransferCount(ListFile.REL_SUCCESS, 1);
     }
 
     @Test
-    public void testFilterAge() throws IOException {
+    public void testFilterAge() throws Exception {
         final File file1 = new File(TESTDIR + "/age1.txt");
         assertTrue(file1.createNewFile());
         assertTrue(file1.setLastModified(time0millis));
@@ -167,9 +187,15 @@ public class TestListFile  {
         // check all files
         runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
         runner.run();
+        runner.assertTransferCount(ListFile.REL_SUCCESS, 2);
+        runner.clearTransferState();
+
+        Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
+
+        runner.run();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles1 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
-        assertEquals(3, successFiles1.size());
+        assertEquals(1, successFiles1.size());
 
         // exclude oldest
         runner.clearTransferState();
@@ -178,7 +204,13 @@ public class TestListFile  {
         runner.run();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles2 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
-        assertEquals(2, successFiles2.size());
+        assertEquals(1, successFiles2.size());
+        runner.clearTransferState();
+
+        Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
+
+        runner.run();
+        runner.assertTransferCount(ListFile.REL_SUCCESS, 1);
 
         // exclude newest
         runner.clearTransferState();
@@ -187,7 +219,7 @@ public class TestListFile  {
         runner.run();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles3 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
-        assertEquals(2, successFiles3.size());
+        assertEquals(1, successFiles3.size());
 
         // exclude oldest and newest
         runner.clearTransferState();
@@ -196,11 +228,18 @@ public class TestListFile  {
         runner.run();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles4 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
-        assertEquals(1, successFiles4.size());
+        assertEquals(0, successFiles4.size());
+        runner.clearTransferState();
+
+        Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
+        runner.assertTransferCount(ListFile.REL_SUCCESS, 1);
     }
 
     @Test
-    public void testFilterSize() throws IOException {
+    public void testFilterSize() throws Exception {
         final byte[] bytes1000 = new byte[1000];
         final byte[] bytes5000 = new byte[5000];
         final byte[] bytes10000 = new byte[10000];
@@ -227,6 +266,13 @@ public class TestListFile  {
         // check all files
         runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
         runner.run();
+        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
+
+        runner.run();
+
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles1 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         assertEquals(3, successFiles1.size());
@@ -238,6 +284,14 @@ public class TestListFile  {
         runner.setProperty(ListFile.MIN_SIZE, "0 b");
         runner.setProperty(ListFile.MAX_SIZE, "7500 b");
         runner.run();
+
+        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
+
+        runner.run();
+
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles2 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         assertEquals(2, successFiles2.size());
@@ -249,6 +303,12 @@ public class TestListFile  {
         runner.setProperty(ListFile.MIN_SIZE, "2500 b");
         runner.removeProperty(ListFile.MAX_SIZE);
         runner.run();
+        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
+
+        runner.run();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles3 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         assertEquals(2, successFiles3.size());
@@ -260,13 +320,20 @@ public class TestListFile  {
         runner.setProperty(ListFile.MIN_SIZE, "2500 b");
         runner.setProperty(ListFile.MAX_SIZE, "7500 b");
         runner.run();
+        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
+
+        runner.run();
+
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles4 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         assertEquals(1, successFiles4.size());
     }
 
     @Test
-    public void testFilterHidden() throws IOException {
+    public void testFilterHidden() throws Exception {
         FileOutputStream fos;
 
         final File file1 = new File(TESTDIR + "/hidden1.txt");
@@ -293,6 +360,11 @@ public class TestListFile  {
         runner.removeProperty(ListFile.MAX_SIZE);
         runner.setProperty(ListFile.IGNORE_HIDDEN_FILES, "false");
         runner.run();
+        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
+
+        Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
+
+        runner.run();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles1 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         assertEquals(2, successFiles1.size());
@@ -301,13 +373,18 @@ public class TestListFile  {
         runner.clearTransferState();
         runner.setProperty(ListFile.IGNORE_HIDDEN_FILES, "true");
         runner.run();
+        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
+
+        Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
+
+        runner.run();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles2 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         assertEquals(1, successFiles2.size());
     }
 
     @Test
-    public void testFilterFilePattern() throws IOException {
+    public void testFilterFilePattern() throws Exception {
         final File file1 = new File(TESTDIR + "/file1-abc-apple.txt");
         assertTrue(file1.createNewFile());
 
@@ -325,6 +402,11 @@ public class TestListFile  {
         runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
         runner.setProperty(ListFile.FILE_FILTER, 
ListFile.FILE_FILTER.getDefaultValue());
         runner.run();
+        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
+
+        Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
+
+        runner.run();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles1 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         assertEquals(4, successFiles1.size());
@@ -333,13 +415,18 @@ public class TestListFile  {
         runner.clearTransferState();
         runner.setProperty(ListFile.FILE_FILTER, ".*-xyz-.*");
         runner.run();
+        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
+
+        Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
+
+        runner.run();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles2 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         assertEquals(2, successFiles2.size());
     }
 
     @Test
-    public void testFilterPathPattern() throws IOException {
+    public void testFilterPathPattern() throws Exception {
         final File subdir1 = new File(TESTDIR + "/subdir1");
         assertTrue(subdir1.mkdirs());
 
@@ -363,6 +450,11 @@ public class TestListFile  {
         runner.setProperty(ListFile.FILE_FILTER, 
ListFile.FILE_FILTER.getDefaultValue());
         runner.setProperty(ListFile.RECURSE, "true");
         runner.run();
+        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
+
+        Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
+
+        runner.run();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles1 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         assertEquals(4, successFiles1.size());
@@ -372,6 +464,11 @@ public class TestListFile  {
         runner.setProperty(ListFile.PATH_FILTER, "subdir1");
         runner.setProperty(ListFile.RECURSE, "true");
         runner.run();
+        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
+
+        Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
+
+        runner.run();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles2 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         assertEquals(3, successFiles2.size());
@@ -381,13 +478,18 @@ public class TestListFile  {
         runner.setProperty(ListFile.PATH_FILTER, "subdir2");
         runner.setProperty(ListFile.RECURSE, "true");
         runner.run();
+        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
+
+        Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
+
+        runner.run();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles3 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         assertEquals(1, successFiles3.size());
     }
 
     @Test
-    public void testRecurse() throws IOException {
+    public void testRecurse() throws Exception {
         final File subdir1 = new File(TESTDIR + "/subdir1");
         assertTrue(subdir1.mkdirs());
 
@@ -408,6 +510,12 @@ public class TestListFile  {
         runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
         runner.setProperty(ListFile.RECURSE, "true");
         runner.run();
+        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
+
+        runner.run();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS, 3);
         final List<MockFlowFile> successFiles1 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         for (final MockFlowFile mff : successFiles1) {
@@ -435,13 +543,21 @@ public class TestListFile  {
         runner.clearTransferState();
         runner.setProperty(ListFile.RECURSE, "false");
         runner.run();
+
+        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
+
+        runner.run();
+
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles2 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         assertEquals(1, successFiles2.size());
     }
 
     @Test
-    public void testReadable() throws IOException {
+    public void testReadable() throws Exception {
         final File file1 = new File(TESTDIR + "/file1.txt");
         assertTrue(file1.createNewFile());
 
@@ -456,13 +572,18 @@ public class TestListFile  {
         runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
         runner.setProperty(ListFile.RECURSE, "true");
         runner.run();
+        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
+
+        runner.run();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
-        final List<MockFlowFile> successFiles1 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
-        assertEquals(3, successFiles1.size());
+        runner.assertTransferCount(ListFile.REL_SUCCESS, 3);
     }
 
     @Test
-    public void testAttributesSet() throws IOException {
+    public void testAttributesSet() throws Exception {
         // create temp file and time constant
         final File file1 = new File(TESTDIR + "/file1.txt");
         assertTrue(file1.createNewFile());
@@ -477,6 +598,13 @@ public class TestListFile  {
         runner.clearTransferState();
         runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
         runner.run();
+        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
+
+        runner.run();
+
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles1 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         assertEquals(1, successFiles1.size());

Reply via email to