Repository: nifi
Updated Branches:
  refs/heads/master a4e729c7a -> e68ff153e


NIFI-4069: Make ListXXX work with timestamp precision in seconds or minutes

- Refactored variable names to better represents what those are meant for.
- Added deterministic logic which detects target filesystem timestamp precision 
and adjust lag time based on it.
- Changed from using System.nanoTime() to System.currentTimeMillis in test 
because Java File API reports timestamp in milliseconds at the best 
granularity. Also, System.nanoTime should not be used in mix with epoch 
milliseconds because it uses arbitrary origin and measured differently.
- Changed TestListFile to use more longer interval between file timestamps 
those are used by testFilterAge to provide more consistent test result because 
sleep time can be longer with filesystems whose timestamp in seconds precision.
- Added logging at TestListFile.
- Added TestWatcher to dump state in case assertion fails for further 
investigation.
- Added Timestamp Precision property so that user can set if auto-detect is not 
enough
- Adjust timestamps for ages test

This closes #1915.

Signed-off-by: Bryan Bende <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/28ee7022
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/28ee7022
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/28ee7022

Branch: refs/heads/master
Commit: 28ee70222b892fb799f5f74a31a9de678d9fb629
Parents: a4e729c
Author: Koji Kawamura <[email protected]>
Authored: Wed Jun 14 15:21:01 2017 +0900
Committer: Bryan Bende <[email protected]>
Committed: Mon Aug 28 11:31:03 2017 -0400

----------------------------------------------------------------------
 .../nifi-processor-utils/pom.xml                |   2 +-
 .../util/list/AbstractListProcessor.java        | 161 +++++++++++-----
 .../util/list/ListProcessorTestWatcher.java     | 128 +++++++++++++
 .../processor/util/list/ListableEntity.java     |   2 +-
 .../util/list/TestAbstractListProcessor.java    | 190 +++++++++++++------
 .../nifi/processors/standard/ListFTP.java       |   1 +
 .../nifi/processors/standard/ListFile.java      |   1 +
 .../nifi/processors/standard/ListSFTP.java      |   1 +
 .../nifi/processors/standard/TestListFile.java  | 107 ++++++++---
 9 files changed, 463 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/28ee7022/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
index 3f5e60c..dd38e10 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
@@ -70,7 +70,7 @@
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
-            <scope>test</scope>
+            <scope>compile</scope>
         </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/28ee7022/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
index 2666e2c..8d93a65 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -37,6 +38,7 @@ import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
 import org.apache.nifi.annotation.notification.PrimaryNodeState;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateManager;
@@ -138,14 +140,42 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
         .identifiesControllerService(DistributedMapCacheClient.class)
         .build();
 
+    public static final AllowableValue PRECISION_AUTO_DETECT = new 
AllowableValue("auto-detect", "Auto Detect",
+    "Automatically detect time unit deterministically based on candidate 
entries timestamp."
+            + " Please note that this option may take longer to list entities 
unnecessarily, if none of entries has a precise precision timestamp."
+            + " E.g. even if a target system supports millis, if all entries 
only have timestamps without millis, such as '2017-06-16 09:06:34.000', then 
its precision is determined as 'seconds'.");
+    public static final AllowableValue PRECISION_MILLIS = new 
AllowableValue("millis", "Milliseconds",
+            "This option provides the minimum latency for an entry from being 
available to being listed if target system supports millis, if not, use other 
options.");
+    public static final AllowableValue PRECISION_SECONDS = new 
AllowableValue("seconds", "Seconds","For a target system that does not have 
millis precision, but has in seconds.");
+    public static final AllowableValue PRECISION_MINUTES = new 
AllowableValue("minutes", "Minutes", "For a target system that only supports 
precision in minutes.");
+
+    public static final PropertyDescriptor TARGET_SYSTEM_TIMESTAMP_PRECISION = 
new PropertyDescriptor.Builder()
+        .name("target-system-timestamp-precision")
+        .displayName("Target System Timestamp Precision")
+        .description("Specify timestamp precision at the target system."
+                + " Since this processor uses timestamp of entities to decide 
which should be listed, it is crucial to use the right timestamp precision.")
+        .required(true)
+        .allowableValues(PRECISION_AUTO_DETECT, PRECISION_MILLIS, 
PRECISION_SECONDS, PRECISION_MINUTES)
+        .defaultValue(PRECISION_AUTO_DETECT.getValue())
+        .build();
+
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
         .name("success")
         .description("All FlowFiles that are received are routed to success")
         .build();
 
-    private volatile Long lastListingTime = null;
-    private volatile Long lastProcessedTime = 0L;
-    private volatile Long lastRunTime = 0L;
+    /**
+     * Represents the timestamp of an entity which was the latest one within 
those listed at the previous cycle.
+     * It does not necessary mean it has been processed as well.
+     * Whether it was processed or not depends on target system time precision 
and how old the entity timestamp was.
+     */
+    private volatile Long lastListedLatestEntryTimestampMillis = null;
+    /**
+     * Represents the timestamp of an entity which was the latest one
+     * within those picked up and written to the output relationship at the 
previous cycle.
+     */
+    private volatile Long lastProcessedLatestEntryTimestampMillis = 0L;
+    private volatile Long lastRunTimeNanos = 0L;
     private volatile boolean justElectedPrimaryNode = false;
     private volatile boolean resetState = false;
 
@@ -154,9 +184,16 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
      * files according to timestamp, it is ensured that at least the specified 
millis has been eclipsed to avoid getting scheduled
      * near instantaneously after the prior iteration effectively voiding the 
built in buffer
      */
-    public static final long LISTING_LAG_NANOS = 
TimeUnit.MILLISECONDS.toNanos(100L);
-    static final String LISTING_TIMESTAMP_KEY = "listing.timestamp";
-    static final String PROCESSED_TIMESTAMP_KEY = "processed.timestamp";
+    public static final Map<TimeUnit, Long> LISTING_LAG_MILLIS;
+    static {
+        final Map<TimeUnit, Long> nanos = new HashMap<>();
+        nanos.put(TimeUnit.MILLISECONDS, 100L);
+        nanos.put(TimeUnit.SECONDS, 1_000L);
+        nanos.put(TimeUnit.MINUTES, 60_000L);
+        LISTING_LAG_MILLIS = Collections.unmodifiableMap(nanos);
+    }
+    static final String LATEST_LISTED_ENTRY_TIMESTAMP_KEY = 
"listing.timestamp";
+    static final String LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY = 
"processed.timestamp";
 
     public File getPersistenceFile() {
         return new File("conf/state/" + getIdentifier());
@@ -166,6 +203,7 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(DISTRIBUTED_CACHE_SERVICE);
+        properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION);
         return properties;
     }
 
@@ -208,7 +246,7 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
         }
 
         // When scheduled to run, check if the associated timestamp is null, 
signifying a clearing of state and reset the internal timestamp
-        if (lastListingTime != null && stateMap.get(LISTING_TIMESTAMP_KEY) == 
null) {
+        if (lastListedLatestEntryTimestampMillis != null && 
stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY) == null) {
             getLogger().info("Detected that state was cleared for this 
component.  Resetting internal values.");
             resetTimeStates();
         }
@@ -283,10 +321,12 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
         }
     }
 
-    private void persist(final long listingTimestamp, final long 
processedTimestamp, final StateManager stateManager, final Scope scope) throws 
IOException {
+    private void persist(final long latestListedEntryTimestampThisCycleMillis,
+                         final long lastProcessedLatestEntryTimestampMillis,
+                         final StateManager stateManager, final Scope scope) 
throws IOException {
         final Map<String, String> updatedState = new HashMap<>(1);
-        updatedState.put(LISTING_TIMESTAMP_KEY, 
String.valueOf(listingTimestamp));
-        updatedState.put(PROCESSED_TIMESTAMP_KEY, 
String.valueOf(processedTimestamp));
+        updatedState.put(LATEST_LISTED_ENTRY_TIMESTAMP_KEY, 
String.valueOf(latestListedEntryTimestampThisCycleMillis));
+        updatedState.put(LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, 
String.valueOf(lastProcessedLatestEntryTimestampMillis));
         stateManager.setState(updatedState, scope);
     }
 
@@ -303,26 +343,26 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        Long minTimestamp = lastListingTime;
+        Long minTimestampToListMillis = lastListedLatestEntryTimestampMillis;
 
-        if (this.lastListingTime == null || this.lastProcessedTime == null || 
justElectedPrimaryNode) {
+        if (this.lastListedLatestEntryTimestampMillis == null || 
this.lastProcessedLatestEntryTimestampMillis == null || justElectedPrimaryNode) 
{
             try {
                 // Attempt to retrieve state from the state manager if a last 
listing was not yet established or
                 // if just elected the primary node
                 final StateMap stateMap = 
context.getStateManager().getState(getStateScope(context));
-                final String listingTimestampString = 
stateMap.get(LISTING_TIMESTAMP_KEY);
-                final String lastProcessedString= 
stateMap.get(PROCESSED_TIMESTAMP_KEY);
-                if (lastProcessedString != null) {
-                    this.lastProcessedTime = 
Long.parseLong(lastProcessedString);
+                final String latestListedEntryTimestampString = 
stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY);
+                final String lastProcessedLatestEntryTimestampString= 
stateMap.get(LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY);
+                if (lastProcessedLatestEntryTimestampString != null) {
+                    this.lastProcessedLatestEntryTimestampMillis = 
Long.parseLong(lastProcessedLatestEntryTimestampString);
                 }
-                if (listingTimestampString != null) {
-                    minTimestamp = Long.parseLong(listingTimestampString);
+                if (latestListedEntryTimestampString != null) {
+                    minTimestampToListMillis = 
Long.parseLong(latestListedEntryTimestampString);
                     // If our determined timestamp is the same as that of our 
last listing, skip this execution as there are no updates
-                    if (minTimestamp == this.lastListingTime) {
+                    if (minTimestampToListMillis == 
this.lastListedLatestEntryTimestampMillis) {
                         context.yield();
                         return;
                     } else {
-                        this.lastListingTime = minTimestamp;
+                        this.lastListedLatestEntryTimestampMillis = 
minTimestampToListMillis;
                     }
                 }
                 justElectedPrimaryNode = false;
@@ -334,10 +374,11 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
         }
 
         final List<T> entityList;
-        final long currentListingTimestamp = System.nanoTime();
+        final long currentRunTimeNanos = System.nanoTime();
+        final long currentRunTimeMillis = System.currentTimeMillis();
         try {
             // track of when this last executed for consideration of the lag 
nanos
-            entityList = performListing(context, minTimestamp);
+            entityList = performListing(context, minTimestampToListMillis);
         } catch (final IOException e) {
             getLogger().error("Failed to perform listing on remote host due to 
{}", e);
             context.yield();
@@ -349,14 +390,22 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
             return;
         }
 
-        Long latestListingTimestamp = null;
+        Long latestListedEntryTimestampThisCycleMillis = null;
         final TreeMap<Long, List<T>> orderedEntries = new TreeMap<>();
 
         // Build a sorted map to determine the latest possible entries
+        boolean targetSystemHasMilliseconds = false;
+        boolean targetSystemHasSeconds = false;
         for (final T entity : entityList) {
-            final long entityTimestamp = entity.getTimestamp();
+            final long entityTimestampMillis = entity.getTimestamp();
+            if (!targetSystemHasMilliseconds) {
+                targetSystemHasMilliseconds = entityTimestampMillis % 1000 > 0;
+            }
+            if (!targetSystemHasSeconds) {
+                targetSystemHasSeconds = entityTimestampMillis % 60_000 > 0;
+            }
             // New entries are all those that occur at or after the associated 
timestamp
-            final boolean newEntry = minTimestamp == null || entityTimestamp 
>= minTimestamp && entityTimestamp > lastProcessedTime;
+            final boolean newEntry = minTimestampToListMillis == null || 
entityTimestampMillis >= minTimestampToListMillis && entityTimestampMillis > 
lastProcessedLatestEntryTimestampMillis;
 
             if (newEntry) {
                 List<T> entitiesForTimestamp = 
orderedEntries.get(entity.getTimestamp());
@@ -371,25 +420,43 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
         int flowfilesCreated = 0;
 
         if (orderedEntries.size() > 0) {
-            latestListingTimestamp = orderedEntries.lastKey();
+            latestListedEntryTimestampThisCycleMillis = 
orderedEntries.lastKey();
+
+            // Determine target system time precision.
+            final String specifiedPrecision = 
context.getProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION).getValue();
+            final TimeUnit targetSystemTimePrecision
+                    = 
PRECISION_AUTO_DETECT.getValue().equals(specifiedPrecision)
+                        ? targetSystemHasMilliseconds ? TimeUnit.MILLISECONDS 
: targetSystemHasSeconds ? TimeUnit.SECONDS : TimeUnit.MINUTES
+                    : PRECISION_MILLIS.getValue().equals(specifiedPrecision) ? 
TimeUnit.MILLISECONDS
+                    : PRECISION_SECONDS.getValue().equals(specifiedPrecision) 
? TimeUnit.SECONDS : TimeUnit.MINUTES;
+            final Long listingLagMillis = 
LISTING_LAG_MILLIS.get(targetSystemTimePrecision);
 
             // If the last listing time is equal to the newest entries 
previously seen,
             // another iteration has occurred without new files and special 
handling is needed to avoid starvation
-            if (latestListingTimestamp.equals(lastListingTime)) {
-                /* We are done when either:
-                 *   - the latest listing timestamp is If we have not eclipsed 
the minimal listing lag needed due to being triggered too soon after the last 
run
-                 *   - the latest listing timestamp is equal to the last 
processed time, meaning we handled those items originally passed over
+            if 
(latestListedEntryTimestampThisCycleMillis.equals(lastListedLatestEntryTimestampMillis))
 {
+                /* We need to wait for another cycle when either:
+                 *   - If we have not eclipsed the minimal listing lag needed 
due to being triggered too soon after the last run
+                 *   - The latest listed entity timestamp is equal to the last 
processed time, meaning we handled those items originally passed over. No need 
to process it again.
                  */
-                if (System.nanoTime() - lastRunTime < LISTING_LAG_NANOS || 
latestListingTimestamp.equals(lastProcessedTime)) {
+                final long  listingLagNanos = 
TimeUnit.MILLISECONDS.toNanos(listingLagMillis);
+                if (currentRunTimeNanos - lastRunTimeNanos < listingLagNanos 
|| 
latestListedEntryTimestampThisCycleMillis.equals(lastProcessedLatestEntryTimestampMillis))
 {
                     context.yield();
                     return;
                 }
 
-            } else if (latestListingTimestamp >= currentListingTimestamp - 
LISTING_LAG_NANOS) {
-                // Otherwise, newest entries are held back one cycle to avoid 
issues in writes occurring exactly when the listing is being performed to avoid 
missing data
-                orderedEntries.remove(latestListingTimestamp);
+            } else {
+                // Convert minimum reliable timestamp into target system time 
unit, in order to truncate unreliable digits.
+                final long minimumReliableTimestampInFilesystemTimeUnit = 
targetSystemTimePrecision.convert(currentRunTimeMillis - listingLagMillis, 
TimeUnit.MILLISECONDS);
+                final long minimumReliableTimestampMillis = 
targetSystemTimePrecision.toMillis(minimumReliableTimestampInFilesystemTimeUnit);
+                // If the latest listed entity is not old enough, compared 
with the minimum timestamp, then wait for another cycle.
+                // The minimum timestamp should be reliable to determine that 
no further entries will be added with the same timestamp based on the target 
system time precision.
+                if (minimumReliableTimestampMillis < 
latestListedEntryTimestampThisCycleMillis) {
+                    // Otherwise, newest entries are held back one cycle to 
avoid issues in writes occurring exactly when the listing is being performed to 
avoid missing data
+                    
orderedEntries.remove(latestListedEntryTimestampThisCycleMillis);
+                }
             }
 
+
             for (List<T> timestampEntities : orderedEntries.values()) {
                 for (T entity : timestampEntities) {
                     // Create the FlowFile for this path.
@@ -403,18 +470,20 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
         }
 
         // As long as we have a listing timestamp, there is meaningful state 
to capture regardless of any outputs generated
-        if (latestListingTimestamp != null) {
+        if (latestListedEntryTimestampThisCycleMillis != null) {
             boolean processedNewFiles = flowfilesCreated > 0;
             if (processedNewFiles) {
-                // If there have been files created, update the last timestamp 
we processed
-                lastProcessedTime = orderedEntries.lastKey();
+                // If there have been files created, update the last timestamp 
we processed.
+                // Retrieving lastKey instead of using 
latestListedEntryTimestampThisCycleMillis is intentional here,
+                // because latestListedEntryTimestampThisCycleMillis might be 
removed if it's not old enough.
+                lastProcessedLatestEntryTimestampMillis = 
orderedEntries.lastKey();
                 getLogger().info("Successfully created listing with {} new 
objects", new Object[]{flowfilesCreated});
                 session.commit();
             }
 
-            lastRunTime = System.nanoTime();
+            lastRunTimeNanos = currentRunTimeNanos;
 
-            if (!latestListingTimestamp.equals(lastListingTime) || 
processedNewFiles) {
+            if 
(!latestListedEntryTimestampThisCycleMillis.equals(lastListedLatestEntryTimestampMillis)
 || processedNewFiles) {
                 // We have performed a listing and pushed any FlowFiles out 
that may have been generated
                 // Now, we need to persist state about the Last Modified 
timestamp of the newest file
                 // that we evaluated. We do this in order to avoid pulling in 
the same file twice.
@@ -424,8 +493,8 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
                 // We also store the state locally so that if the node is 
restarted, and the node cannot contact
                 // the distributed state cache, the node can continue to run 
(if it is primary node).
                 try {
-                    lastListingTime = latestListingTimestamp;
-                    persist(latestListingTimestamp, lastProcessedTime, 
context.getStateManager(), getStateScope(context));
+                    lastListedLatestEntryTimestampMillis = 
latestListedEntryTimestampThisCycleMillis;
+                    persist(latestListedEntryTimestampThisCycleMillis, 
lastProcessedLatestEntryTimestampMillis, context.getStateManager(), 
getStateScope(context));
                 } catch (final IOException ioe) {
                     getLogger().warn("Unable to save state due to {}. If NiFi 
is restarted before state is saved, or "
                         + "if another node begins executing this Processor, 
data duplication may occur.", ioe);
@@ -437,8 +506,8 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
             context.yield();
 
             // lastListingTime = 0 so that we don't continually poll the 
distributed cache / local file system
-            if (lastListingTime == null) {
-                lastListingTime = 0L;
+            if (lastListedLatestEntryTimestampMillis == null) {
+                lastListedLatestEntryTimestampMillis = 0L;
             }
 
             return;
@@ -446,9 +515,9 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
     }
 
     private void resetTimeStates() {
-        lastListingTime = null;
-        lastProcessedTime = 0L;
-        lastRunTime = 0L;
+        lastListedLatestEntryTimestampMillis = null;
+        lastProcessedLatestEntryTimestampMillis = 0L;
+        lastRunTimeNanos = 0L;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/nifi/blob/28ee7022/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListProcessorTestWatcher.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListProcessorTestWatcher.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListProcessorTestWatcher.java
new file mode 100644
index 0000000..dcb53e0
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListProcessorTestWatcher.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.list;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.SimpleDateFormat;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * This class provides a way to dump list-able entities, processor state and 
transferred FlowFiles into 'success' relationship,
+ * which is useful to debug test issues especially at automation test 
environment such as Travis that is difficult to debug.
+ */
+public class ListProcessorTestWatcher extends TestWatcher {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ListProcessorTestWatcher.class);
+    private static final Consumer<String> logStateDump = logger::info;
+
+    @FunctionalInterface
+    public interface Provider<T> {
+        T provide();
+    }
+
+    private final SimpleDateFormat dateFormat = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
+    private final Provider<Map<String, String>> stateMapProvider;
+    private final Provider<List<ListableEntity>> entitiesProvider;
+    private final Provider<List<FlowFile>> successFlowFilesProvider;
+
+    private long startedAtMillis;
+
+    public ListProcessorTestWatcher(Provider<Map<String, String>> 
stateMapProvider, Provider<List<ListableEntity>> entitiesProvider, 
Provider<List<FlowFile>> successFlowFilesProvider) {
+        this.stateMapProvider = stateMapProvider;
+        this.entitiesProvider = entitiesProvider;
+        this.successFlowFilesProvider = successFlowFilesProvider;
+    }
+
+    private void log(Consumer<String> dumper, String format, Object ... args) {
+        dumper.accept(String.format(format, args));
+    }
+
+    public void dumpState(final long start) {
+        dumpState(logStateDump, stateMapProvider.provide(), 
entitiesProvider.provide(), successFlowFilesProvider.provide(), start);
+    }
+
+    private void dumpState(Consumer<String> d, final Map<String, String> 
state, final List<ListableEntity> entities, final List<FlowFile> flowFiles, 
final long start) {
+
+        final long nTime = System.currentTimeMillis();
+        log(d, 
"--------------------------------------------------------------------");
+        log(d, "%-19s   %-13s %-23s %s", "", "timestamp", "date from 
timestamp", "t0 delta");
+        log(d, "%-19s   %-13s %-23s %s", "-------------------", 
"-------------", "-----------------------", "--------");
+        log(d, "%-19s = %13d %s %8d", "started at", start, 
dateFormat.format(start), 0);
+        log(d, "%-19s = %13d %s %8d", "current time", nTime, 
dateFormat.format(nTime), 0);
+        log(d, "---- processor state 
-----------------------------------------------");
+        if (state.containsKey("processed.timestamp")) {
+            final long pTime = 
Long.parseLong(state.get("processed.timestamp"));
+            log(d, "%19s = %13d %s %8d", "processed.timestamp", pTime, 
dateFormat.format(pTime), pTime - nTime);
+        } else {
+            log(d, "%19s = na", "processed.timestamp");
+        }
+        if (state.containsKey("listing.timestamp")) {
+            final long lTime = Long.parseLong(state.get("listing.timestamp"));
+            log(d, "%19s = %13d %s %8d", "listing.timestamp", lTime, 
dateFormat.format(lTime), lTime - nTime);
+        } else {
+            log(d, "%19s = na", "listing.timestamp");
+        }
+        log(d, "---- input folder contents 
-----------------------------------------");
+        entities.sort(Comparator.comparing(ListableEntity::getIdentifier));
+        for (ListableEntity entity : entities) {
+            log(d, "%19s = %12d %s %8d", entity.getIdentifier(), 
entity.getTimestamp(), dateFormat.format(entity.getTimestamp()), 
entity.getTimestamp() - nTime);
+        }
+        log(d, "---- output flowfiles 
----------------------------------------------");
+        final Map<String, Long> fileTimes = 
entities.stream().collect(Collectors.toMap(ListableEntity::getIdentifier, 
ListableEntity::getTimestamp));
+        for (FlowFile ff : flowFiles) {
+            String fName = ff.getAttribute(CoreAttributes.FILENAME.key());
+            Long fTime = fileTimes.get(fName);
+            log(d, "%19s = %13d %s %8d", fName, fTime, 
dateFormat.format(fTime), fTime - nTime);
+        }
+        log(d, "REL_SUCCESS count = " + flowFiles.size());
+        log(d, 
"--------------------------------------------------------------------");
+        log(d, "");
+    }
+
+    @Override
+    protected void starting(Description description) {
+        startedAtMillis = System.currentTimeMillis();
+    }
+
+    /**
+     * Throw additional AssertionError with stateDump as its message.
+     */
+    @Override
+    protected void failed(Throwable e, Description description) {
+        if (!(e instanceof AssertionError)) {
+            return;
+        }
+
+        final StringBuilder msg = new StringBuilder("State dump:\n");
+        dumpState(s -> msg.append(s).append("\n"),
+                stateMapProvider.provide(),
+                entitiesProvider.provide(),
+                successFlowFilesProvider.provide(),
+                startedAtMillis);
+        throw new AssertionError(msg);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/28ee7022/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListableEntity.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListableEntity.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListableEntity.java
index 3c7c08d..01837cb 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListableEntity.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListableEntity.java
@@ -32,7 +32,7 @@ public interface ListableEntity {
 
 
     /**
-     * @return the timestamp for this entity so that we can be efficient about 
not performing listings of the same
+     * @return the timestamp for this entity in milliseconds so that we can be 
efficient about not performing listings of the same
      *         entities multiple times
      */
     long getTimestamp();

http://git-wip-us.apache.org/repos/asf/nifi/blob/28ee7022/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
index 2417d52..69705f2 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
@@ -31,6 +31,7 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 import org.apache.commons.io.Charsets;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -40,37 +41,78 @@ import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.distributed.cache.client.Deserializer;
 import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
 import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.util.list.AbstractListProcessor;
-import org.apache.nifi.processor.util.list.ListableEntity;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.state.MockStateManager;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestWatcher;
 
 public class TestAbstractListProcessor {
 
-    static final long DEFAULT_SLEEP_MILLIS = 
TimeUnit.NANOSECONDS.toMillis(AbstractListProcessor.LISTING_LAG_NANOS * 2);
+    /**
+     * @return current timestamp in milliseconds, but truncated at specified 
target precision (e.g. SECONDS or MINUTES).
+     */
+    private static long getCurrentTimestampMillis(final TimeUnit 
targetPrecision) {
+        final long timestampInTargetPrecision = 
targetPrecision.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+        return TimeUnit.MILLISECONDS.convert(timestampInTargetPrecision, 
targetPrecision);
+    }
+
+    private static long getSleepMillis(final TimeUnit targetPrecision) {
+        return AbstractListProcessor.LISTING_LAG_MILLIS.get(targetPrecision) * 
2;
+    }
+
+    private static final long DEFAULT_SLEEP_MILLIS = 
getSleepMillis(TimeUnit.MILLISECONDS);
+
+    private ConcreteListProcessor proc;
+    private TestRunner runner;
+
+    @Rule
+    public TestWatcher dumpState = new ListProcessorTestWatcher(
+            () -> {
+                try {
+                    return 
runner.getStateManager().getState(Scope.LOCAL).toMap();
+                } catch (IOException e) {
+                    throw new RuntimeException("Failed to retrieve state", e);
+                }
+            },
+            () -> proc.entities,
+            () -> 
runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).stream().map(m
 -> (FlowFile) m).collect(Collectors.toList())
+    );
+
+    @Before
+    public void setup() {
+        proc = new ConcreteListProcessor();
+        runner = TestRunners.newTestRunner(proc);
+    }
 
     @Rule
     public final TemporaryFolder testFolder = new TemporaryFolder();
 
+    /**
+     * <p>Ensures that files are listed when those are old enough:
+     *   <li>Files with last modified timestamp those are old enough to 
determine that those are completely written
+     *     and no further files are expected to be added with the same 
timestamp.</li>
+     *   <li>This behavior is expected when a processor is scheduled less 
frequently, such as hourly or daily.</li>
+     * </p>
+     */
     @Test
     public void testAllExistingEntriesEmittedOnFirstIteration() throws 
Exception {
-        final long oldTimestamp = System.nanoTime() - 
(AbstractListProcessor.LISTING_LAG_NANOS * 2);
+        final long oldTimestamp = System.currentTimeMillis() - 
getSleepMillis(TimeUnit.MILLISECONDS);
 
         // These entries have existed before the processor runs at the first 
time.
-        final ConcreteListProcessor proc = new ConcreteListProcessor();
         proc.addEntity("name", "id", oldTimestamp);
         proc.addEntity("name", "id2", oldTimestamp);
 
         // First run, the above listed entries should be emitted since it has 
existed.
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-
         runner.run();
         
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
         runner.clearTransferState();
@@ -83,13 +125,10 @@ public class TestAbstractListProcessor {
         
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
     }
 
-    @Test
-    public void testPreviouslySkippedEntriesEmittedOnNextIteration() throws 
Exception {
-        final ConcreteListProcessor proc = new ConcreteListProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
+    private void testPreviouslySkippedEntriesEmmitedOnNextIteration(final 
TimeUnit targetPrecision) throws InterruptedException {
         runner.run();
 
-        final long initialTimestamp = System.nanoTime();
+        final long initialTimestamp = 
getCurrentTimestampMillis(targetPrecision);
 
         
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
         proc.addEntity("name", "id", initialTimestamp);
@@ -101,20 +140,47 @@ public class TestAbstractListProcessor {
         runner.clearTransferState();
 
         // Ensure we have covered the necessary lag period to avoid issues 
where the processor was immediately scheduled to run again
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
+        Thread.sleep(getSleepMillis(targetPrecision));
 
         // Run again without introducing any new entries
         runner.run();
         
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
     }
 
+    /**
+     * <p>Ensures that newly created files should wait to confirm there is no 
more files created with the same timestamp:
+     *   <li>If files have the latest modified timestamp at an iteration, then 
those should be postponed to be listed</li>
+     *   <li>If those files still are the latest files at the next iteration, 
then those should be listed</li>
+     * </p>
+     */
     @Test
-    public void testOnlyNewEntriesEmitted() throws Exception {
-        final ConcreteListProcessor proc = new ConcreteListProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.run();
+    public void 
testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision() throws 
Exception {
+        
testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Same as {@link 
#testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision()} but 
simulates that the target
+     * filesystem only provide timestamp precision in Seconds.
+     */
+    @Test
+    public void 
testPreviouslySkippedEntriesEmittedOnNextIterationSecondPrecision() throws 
Exception {
+        testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.SECONDS);
+    }
+
+    /**
+     * Same as {@link 
#testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision()} but 
simulates that the target
+     * filesystem only provide timestamp precision in Minutes.
+     * This test is ignored because it needs to wait two minutes. Not good for 
automated unit testing, but still valuable when executed manually.
+     */
+    @Ignore
+    @Test
+    public void 
testPreviouslySkippedEntriesEmittedOnNextIterationMinutesPrecision() throws 
Exception {
+        testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.MINUTES);
+    }
+
+    private void testOnlyNewEntriesEmitted(final TimeUnit targetPrecision) 
throws InterruptedException {
 
-        final long initialTimestamp = System.nanoTime();
+        final long initialTimestamp = 
getCurrentTimestampMillis(targetPrecision);
 
         
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
         proc.addEntity("name", "id", initialTimestamp);
@@ -126,7 +192,7 @@ public class TestAbstractListProcessor {
         runner.clearTransferState();
 
         // Ensure we have covered the necessary lag period to avoid issues 
where the processor was immediately scheduled to run again
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
+        Thread.sleep(getSleepMillis(targetPrecision));
 
         // Running again, our two previously seen files are now cleared to be 
released
         runner.run();
@@ -139,18 +205,20 @@ public class TestAbstractListProcessor {
         
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
         runner.clearTransferState();
 
-        proc.addEntity("name", "id3", initialTimestamp - 1);
+        // An entry that is older than already processed entry should not be 
listed.
+        proc.addEntity("name", "id3", initialTimestamp - 
targetPrecision.toMillis(1));
         runner.run();
         
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
         runner.clearTransferState();
 
+        // If an entry whose timestamp is the same with the last processed 
timestamp should not be listed.
         proc.addEntity("name", "id2", initialTimestamp);
         runner.run();
         
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
         runner.clearTransferState();
 
         // Now a new file beyond the current time enters
-        proc.addEntity("name", "id2", initialTimestamp + 1);
+        proc.addEntity("name", "id2", initialTimestamp + 
targetPrecision.toMillis(1));
 
         // It should show up
         runner.run();
@@ -159,19 +227,36 @@ public class TestAbstractListProcessor {
     }
 
     @Test
+    public void testOnlyNewEntriesEmittedMillisPrecision() throws Exception {
+        testOnlyNewEntriesEmitted(TimeUnit.MILLISECONDS);
+    }
+
+    @Test
+    public void testOnlyNewEntriesEmittedSecondPrecision() throws Exception {
+        testOnlyNewEntriesEmitted(TimeUnit.SECONDS);
+    }
+
+    /**
+     * This test is ignored because it needs to wait two minutes. Not good for 
automated unit testing, but still valuable when executed manually.
+     */
+    @Ignore
+    @Test
+    public void testOnlyNewEntriesEmittedMinutesPrecision() throws Exception {
+        testOnlyNewEntriesEmitted(TimeUnit.MINUTES);
+    }
+
+    @Test
     public void testHandleRestartWithEntriesAlreadyTransferredAndNoneNew() 
throws Exception {
-        final ConcreteListProcessor proc = new ConcreteListProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
 
-        final long initialTimestamp = System.nanoTime();
+        final long initialTimestamp = System.currentTimeMillis();
 
         proc.addEntity("name", "id", initialTimestamp);
         proc.addEntity("name", "id2", initialTimestamp);
 
         // Emulate having state but not having had the processor run such as 
in a restart
         final Map<String, String> preexistingState = new HashMap<>();
-        preexistingState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, 
Long.toString(initialTimestamp));
-        preexistingState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, 
Long.toString(initialTimestamp));
+        
preexistingState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, 
Long.toString(initialTimestamp));
+        
preexistingState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY,
 Long.toString(initialTimestamp));
         runner.getStateManager().setState(preexistingState, Scope.CLUSTER);
 
         // run for the first time
@@ -216,37 +301,35 @@ public class TestAbstractListProcessor {
 
     @Test
     public void testStateStoredInClusterStateManagement() throws Exception {
-        final ConcreteListProcessor proc = new ConcreteListProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
+
         final DistributedCache cache = new DistributedCache();
         runner.addControllerService("cache", cache);
         runner.enableControllerService(cache);
         runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, 
"cache");
 
-        final long initialTimestamp = System.nanoTime();
+        final long initialTimestamp = System.currentTimeMillis();
 
         proc.addEntity("name", "id", initialTimestamp);
         runner.run();
 
         final Map<String, String> expectedState = new HashMap<>();
         // Ensure only timestamp is migrated
-        expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, 
String.valueOf(initialTimestamp));
-        expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "0");
+        
expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, 
String.valueOf(initialTimestamp));
+        
expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY,
 "0");
         runner.getStateManager().assertStateEquals(expectedState, 
Scope.CLUSTER);
 
         Thread.sleep(DEFAULT_SLEEP_MILLIS);
 
         runner.run();
         // Ensure only timestamp is migrated
-        expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, 
String.valueOf(initialTimestamp));
-        expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, 
String.valueOf(initialTimestamp));
+        
expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, 
String.valueOf(initialTimestamp));
+        
expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY,
 String.valueOf(initialTimestamp));
         runner.getStateManager().assertStateEquals(expectedState, 
Scope.CLUSTER);
     }
 
     @Test
     public void testStateMigratedFromCacheService() throws 
InitializationException {
-        final ConcreteListProcessor proc = new ConcreteListProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
+
         final DistributedCache cache = new DistributedCache();
         runner.addControllerService("cache", cache);
         runner.enableControllerService(cache);
@@ -261,15 +344,13 @@ public class TestAbstractListProcessor {
         final MockStateManager stateManager = runner.getStateManager();
         final Map<String, String> expectedState = new HashMap<>();
         // Ensure only timestamp is migrated
-        expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, "1492");
-        expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, 
"1492");
+        
expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, 
"1492");
+        
expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY,
 "1492");
         stateManager.assertStateEquals(expectedState, Scope.CLUSTER);
     }
 
     @Test
     public void testNoStateToMigrate() throws Exception {
-        final ConcreteListProcessor proc = new ConcreteListProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
 
         runner.run();
 
@@ -280,8 +361,6 @@ public class TestAbstractListProcessor {
 
     @Test
     public void testStateMigratedFromLocalFile() throws Exception {
-        final ConcreteListProcessor proc = new ConcreteListProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
 
         // Create a file that we will populate with the desired state
         File persistenceFile = testFolder.newFile(proc.persistenceFilename);
@@ -305,20 +384,17 @@ public class TestAbstractListProcessor {
         // Verify the state manager now maintains the associated state
         final Map<String, String> expectedState = new HashMap<>();
         // Ensure only timestamp is migrated
-        expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, "1492");
-        expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, 
"1492");
+        
expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, 
"1492");
+        
expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY,
 "1492");
         runner.getStateManager().assertStateEquals(expectedState, 
Scope.CLUSTER);
     }
 
     @Test
     public void testResumeListingAfterClearingState() throws Exception {
-        final ConcreteListProcessor proc = new ConcreteListProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.run();
 
         
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
 
-        final long initialEventTimestamp = System.nanoTime();
+        final long initialEventTimestamp = System.currentTimeMillis();
         proc.addEntity("name", "id", initialEventTimestamp);
         proc.addEntity("name", "id2", initialEventTimestamp);
 
@@ -350,8 +426,7 @@ public class TestAbstractListProcessor {
 
     @Test
     public void testFetchOnStart() throws InitializationException {
-        final ConcreteListProcessor proc = new ConcreteListProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
+
         final DistributedCache cache = new DistributedCache();
         runner.addControllerService("cache", cache);
         runner.enableControllerService(cache);
@@ -364,11 +439,10 @@ public class TestAbstractListProcessor {
 
     @Test
     public void testOnlyNewStateStored() throws Exception {
-        final ConcreteListProcessor proc = new ConcreteListProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
+
         runner.run();
 
-        final long initialTimestamp = System.nanoTime();
+        final long initialTimestamp = System.currentTimeMillis();
 
         
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
         proc.addEntity("name", "id", initialTimestamp);
@@ -390,8 +464,8 @@ public class TestAbstractListProcessor {
         final Map<String, String> map = stateMap.toMap();
         // Ensure only timestamp is migrated
         assertEquals(2, map.size());
-        assertEquals(Long.toString(initialTimestamp), 
map.get(AbstractListProcessor.LISTING_TIMESTAMP_KEY));
-        assertEquals(Long.toString(initialTimestamp), 
map.get(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY));
+        assertEquals(Long.toString(initialTimestamp), 
map.get(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY));
+        assertEquals(Long.toString(initialTimestamp), 
map.get(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY));
 
         proc.addEntity("new name", "new id", initialTimestamp + 1);
         runner.run();
@@ -403,9 +477,9 @@ public class TestAbstractListProcessor {
         assertEquals(3, updatedStateMap.getVersion());
 
         assertEquals(2, updatedStateMap.toMap().size());
-        assertEquals(Long.toString(initialTimestamp + 1), 
updatedStateMap.get(AbstractListProcessor.LISTING_TIMESTAMP_KEY));
+        assertEquals(Long.toString(initialTimestamp + 1), 
updatedStateMap.get(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY));
         // Processed timestamp is now caught up
-        assertEquals(Long.toString(initialTimestamp + 1), 
updatedStateMap.get(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY));
+        assertEquals(Long.toString(initialTimestamp + 1), 
updatedStateMap.get(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY));
     }
 
     private static class DistributedCache extends AbstractControllerService 
implements DistributedMapCacheClient {
@@ -502,7 +576,9 @@ public class TestAbstractListProcessor {
 
         @Override
         protected Map<String, String> createAttributes(final ListableEntity 
entity, final ProcessContext context) {
-            return Collections.emptyMap();
+            final Map<String, String> attributes = new HashMap<>();
+            attributes.put(CoreAttributes.FILENAME.key(), 
entity.getIdentifier());
+            return attributes;
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/28ee7022/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
index f445588..8204830 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
@@ -84,6 +84,7 @@ public class ListFTP extends ListFileTransfer {
         properties.add(FTPTransfer.PROXY_PORT);
         properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
         properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
+        properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION);
         return properties;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/28ee7022/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
index 33d7867..5f2e2d2 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
@@ -213,6 +213,7 @@ public class ListFile extends 
AbstractListProcessor<FileInfo> {
         properties.add(MIN_SIZE);
         properties.add(MAX_SIZE);
         properties.add(IGNORE_HIDDEN_FILES);
+        properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION);
         this.properties = Collections.unmodifiableList(properties);
 
         final Set<Relationship> relationships = new HashSet<>();

http://git-wip-us.apache.org/repos/asf/nifi/blob/28ee7022/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
index cb5a7e7..b7805e9 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
@@ -82,6 +82,7 @@ public class ListSFTP extends ListFileTransfer {
         properties.add(SFTPTransfer.CONNECTION_TIMEOUT);
         properties.add(SFTPTransfer.DATA_TIMEOUT);
         properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT);
+        properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION);
         return properties;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/28ee7022/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
index 26dcbbf..1b5b2a4 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
@@ -30,54 +30,87 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.util.list.AbstractListProcessor;
+import org.apache.nifi.processor.util.list.ListProcessorTestWatcher;
+import org.apache.nifi.processors.standard.util.FileInfo;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.runner.Description;
 
 public class TestListFile {
 
-    final String TESTDIR = "target/test/data/in";
-    final File testDir = new File(TESTDIR);
-    ListFile processor;
-    TestRunner runner;
-    ProcessContext context;
+    private final String TESTDIR = "target/test/data/in";
+    private final File testDir = new File(TESTDIR);
+    private ListFile processor;
+    private TestRunner runner;
+    private ProcessContext context;
 
     // Testing factors in milliseconds for file ages that are configured on 
each run by resetAges()
     // age#millis are relative time references
     // time#millis are absolute time references
     // age#filter are filter label strings for the filter properties
-    Long syncTime = System.currentTimeMillis();
-    Long time0millis, time1millis, time2millis, time3millis, time4millis, 
time5millis;
-    Long age0millis, age1millis, age2millis, age3millis, age4millis, 
age5millis;
-    String age0, age1, age2, age3, age4, age5;
-
-    static final long DEFAULT_SLEEP_MILLIS = 
TimeUnit.NANOSECONDS.toMillis(AbstractListProcessor.LISTING_LAG_NANOS * 2);
+    private Long syncTime = getTestModifiedTime();
+    private Long time0millis, time1millis, time2millis, time3millis, 
time4millis, time5millis;
+    private Long age0millis, age1millis, age2millis, age3millis, age4millis, 
age5millis;
+    private String age0, age1, age2, age3, age4, age5;
+
+    @Rule
+    public ListProcessorTestWatcher dumpState = new ListProcessorTestWatcher(
+            () -> {
+                try {
+                    return 
runner.getStateManager().getState(Scope.LOCAL).toMap();
+                } catch (IOException e) {
+                    throw new RuntimeException("Failed to retrieve state", e);
+                }
+            },
+            () -> listFiles(testDir).stream()
+                    .map(f -> new 
FileInfo.Builder().filename(f.getName()).lastModifiedTime(f.lastModified()).build()).collect(Collectors.toList()),
+            () -> 
runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).stream().map(m
 -> (FlowFile) m).collect(Collectors.toList())
+    ) {
+        @Override
+        protected void finished(Description description) {
+            try {
+                // In order to refer files in testDir, we want to execute this 
rule before tearDown, because tearDown removes files.
+                // And @After is always executed before @Rule.
+                tearDown();
+            } catch (Exception e) {
+                throw new RuntimeException("Failed to tearDown.", e);
+            }
+        }
+    };
 
     @Before
     public void setUp() throws Exception {
         processor = new ListFile();
         runner = TestRunners.newTestRunner(processor);
+        
runner.setProperty(AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION, 
AbstractListProcessor.PRECISION_SECONDS.getValue());
         context = runner.getProcessContext();
         deleteDirectory(testDir);
         assertTrue("Unable to create test data directory " + 
testDir.getAbsolutePath(), testDir.exists() || testDir.mkdirs());
         resetAges();
     }
 
-    @After
     public void tearDown() throws Exception {
         deleteDirectory(testDir);
         File tempFile = processor.getPersistenceFile();
@@ -91,14 +124,38 @@ public class TestListFile {
         }
     }
 
+    private List<File> listFiles(final File file) {
+        if (file.isDirectory()) {
+            final List<File> result = new ArrayList<>();
+            Optional.ofNullable(file.listFiles()).ifPresent(files -> 
Arrays.stream(files).forEach(f -> result.addAll(listFiles(f))));
+            return result;
+        } else {
+            return Collections.singletonList(file);
+        }
+    }
+
     /**
      * This method ensures runner clears transfer state,
-     * and sleeps the current thread for DEFAULT_SLEEP_MILLIS before executing 
runner.run().
+     * and sleeps the current thread for specific period defined at {@link 
AbstractListProcessor#LISTING_LAG_MILLIS}
+     * based on local filesystem timestamp precision before executing 
runner.run().
      */
     private void runNext() throws InterruptedException {
         runner.clearTransferState();
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
+
+        final List<File> files = listFiles(testDir);
+        final boolean isMillisecondSupported = files.stream().anyMatch(file -> 
file.lastModified() % 1_000 > 0);
+        final Long lagMillis;
+        if (isMillisecondSupported) {
+            lagMillis = 
AbstractListProcessor.LISTING_LAG_MILLIS.get(TimeUnit.MILLISECONDS);
+        } else {
+            // Filesystems such as Mac OS X HFS (Hierarchical File System) or 
EXT3 are known that only support timestamp in seconds precision.
+            lagMillis = 
AbstractListProcessor.LISTING_LAG_MILLIS.get(TimeUnit.SECONDS);
+        }
+        Thread.sleep(lagMillis * 2);
+
+        final long startedAtMillis = System.currentTimeMillis();
         runner.run();
+        dumpState.dumpState(startedAtMillis);
     }
 
     @Test
@@ -305,7 +362,7 @@ public class TestListFile {
 
     @Test
     public void testFilterHidden() throws Exception {
-        final long now = TimeUnit.MILLISECONDS.convert(System.nanoTime(), 
TimeUnit.NANOSECONDS);
+        final long now = getTestModifiedTime();
 
         FileOutputStream fos;
 
@@ -388,7 +445,7 @@ public class TestListFile {
 
     @Test
     public void testFilterPathPattern() throws Exception {
-        final long now = TimeUnit.MILLISECONDS.convert(System.nanoTime(), 
TimeUnit.NANOSECONDS);
+        final long now = getTestModifiedTime();
 
         final File subdir1 = new File(TESTDIR + "/subdir1");
         assertTrue(subdir1.mkdirs());
@@ -595,20 +652,20 @@ public class TestListFile {
      * Provides "now" minus 1 second in millis
     */
     private static long getTestModifiedTime() {
-        final long nowNanos = System.nanoTime();
+        final long nowMillis = System.currentTimeMillis();
         // Subtract a second to avoid possible rounding issues
-        final long nowSeconds = TimeUnit.SECONDS.convert(nowNanos, 
TimeUnit.NANOSECONDS) - 1;
+        final long nowSeconds = TimeUnit.SECONDS.convert(nowMillis, 
TimeUnit.MILLISECONDS) - 1;
         return TimeUnit.MILLISECONDS.convert(nowSeconds, TimeUnit.SECONDS);
     }
 
-    public void resetAges() {
-        syncTime = System.currentTimeMillis();
+    private void resetAges() {
+        syncTime = getTestModifiedTime();
 
         age0millis = 0L;
-        age1millis = 2000L;
-        age2millis = 5000L;
-        age3millis = 7000L;
-        age4millis = 10000L;
+        age1millis = 5000L;
+        age2millis = 10000L;
+        age3millis = 15000L;
+        age4millis = 20000L;
         age5millis = 100000L;
 
         time0millis = syncTime - age0millis;

Reply via email to