NIFI-3332: ListXXX to not miss files with the latest processed timestamp

Before this fix, it's possible that ListXXX processors can miss files those 
have the same timestamp as the one which was the latest processed timestamp at 
the previous cycle. Since it only used timestamps, it was not possible to 
determine whether a file is already processed or not.

However, storing every single processed identifier as we used to will not 
perform well.
Instead, this commit makes ListXXX to store only identifiers those have the 
latest timestamp at a cycle to minimize the amount of state data to store.

NIFI-3332: ListXXX to not miss files with the latest processed timestamp

- Fixed TestAbstractListProcessor to use appropriate time precision.
  Without this fix, arbitrary test can fail if generated timestamp does
  not have the desired time unit value, e.g. generated '10:51:00' where
  second precision is tested.
- Fixed TestFTP.basicFileList to use millisecond time precision explicitly
  because FakeFtpServer's time precision is in minutes.
- Changed junit dependency scope to 'provided' as it is needed by
  ListProcessorTestWatcher which is shared among different modules.

This closes #1975.

Signed-off-by: Bryan Bende <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e68ff153
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e68ff153
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e68ff153

Branch: refs/heads/master
Commit: e68ff153e81ddb82d1136d44a96bdb7a70da86d1
Parents: 28ee702
Author: Koji Kawamura <[email protected]>
Authored: Tue Jul 4 17:34:31 2017 +0900
Committer: Bryan Bende <[email protected]>
Committed: Mon Aug 28 11:31:04 2017 -0400

----------------------------------------------------------------------
 .../nifi-processor-utils/pom.xml                |  3 +-
 .../util/list/AbstractListProcessor.java        | 73 ++++++++++++++------
 .../util/list/TestAbstractListProcessor.java    | 37 ++++++++--
 .../nifi/processors/standard/TestFTP.java       | 10 ++-
 .../nifi/processors/standard/TestListFile.java  | 40 +++++++++++
 5 files changed, 137 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e68ff153/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
index dd38e10..a86dda1 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
@@ -68,9 +68,10 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <!-- Dependency marked as provided, not test, because 
ListProcessorTestWatcher uses TestWatcher -->
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
-            <scope>compile</scope>
+            <scope>provided</scope>
         </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/e68ff153/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
index 8d93a65..52049ed 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
@@ -32,6 +32,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import org.apache.nifi.annotation.behavior.Stateful;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
@@ -178,6 +179,7 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
     private volatile Long lastRunTimeNanos = 0L;
     private volatile boolean justElectedPrimaryNode = false;
     private volatile boolean resetState = false;
+    private volatile List<String> latestIdentifiersProcessed = new 
ArrayList<>();
 
     /*
      * A constant used in determining an internal "yield" of processing files. 
Given the logic to provide a pause on the newest
@@ -194,6 +196,7 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
     }
     static final String LATEST_LISTED_ENTRY_TIMESTAMP_KEY = 
"listing.timestamp";
     static final String LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY = 
"processed.timestamp";
+    static final String IDENTIFIER_PREFIX = "id";
 
     public File getPersistenceFile() {
         return new File("conf/state/" + getIdentifier());
@@ -307,6 +310,8 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
                 // if the local file's latest timestamp is beyond that of the 
value provided from the cache, replace
                 if (minTimestamp == null || localTimestamp > minTimestamp) {
                     minTimestamp = localTimestamp;
+                    latestIdentifiersProcessed.clear();
+                    
latestIdentifiersProcessed.addAll(listing.getMatchingIdentifiers());
                 }
             }
 
@@ -317,16 +322,20 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
         }
 
         if (minTimestamp != null) {
-            persist(minTimestamp, minTimestamp, stateManager, scope);
+            persist(minTimestamp, minTimestamp, latestIdentifiersProcessed, 
stateManager, scope);
         }
     }
 
     private void persist(final long latestListedEntryTimestampThisCycleMillis,
                          final long lastProcessedLatestEntryTimestampMillis,
+                         final List<String> 
processedIdentifiesWithLatestTimestamp,
                          final StateManager stateManager, final Scope scope) 
throws IOException {
-        final Map<String, String> updatedState = new HashMap<>(1);
+        final Map<String, String> updatedState = new 
HashMap<>(processedIdentifiesWithLatestTimestamp.size() + 2);
         updatedState.put(LATEST_LISTED_ENTRY_TIMESTAMP_KEY, 
String.valueOf(latestListedEntryTimestampThisCycleMillis));
         updatedState.put(LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, 
String.valueOf(lastProcessedLatestEntryTimestampMillis));
+        for (int i = 0; i < processedIdentifiesWithLatestTimestamp.size(); 
i++) {
+            updatedState.put(IDENTIFIER_PREFIX + "." + i, 
processedIdentifiesWithLatestTimestamp.get(i));
+        }
         stateManager.setState(updatedState, scope);
     }
 
@@ -350,19 +359,27 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
                 // Attempt to retrieve state from the state manager if a last 
listing was not yet established or
                 // if just elected the primary node
                 final StateMap stateMap = 
context.getStateManager().getState(getStateScope(context));
-                final String latestListedEntryTimestampString = 
stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY);
-                final String lastProcessedLatestEntryTimestampString= 
stateMap.get(LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY);
-                if (lastProcessedLatestEntryTimestampString != null) {
-                    this.lastProcessedLatestEntryTimestampMillis = 
Long.parseLong(lastProcessedLatestEntryTimestampString);
-                }
-                if (latestListedEntryTimestampString != null) {
-                    minTimestampToListMillis = 
Long.parseLong(latestListedEntryTimestampString);
-                    // If our determined timestamp is the same as that of our 
last listing, skip this execution as there are no updates
-                    if (minTimestampToListMillis == 
this.lastListedLatestEntryTimestampMillis) {
-                        context.yield();
-                        return;
-                    } else {
-                        this.lastListedLatestEntryTimestampMillis = 
minTimestampToListMillis;
+                latestIdentifiersProcessed.clear();
+                for (Map.Entry<String, String> state : 
stateMap.toMap().entrySet()) {
+                    final String k = state.getKey();
+                    final String v = state.getValue();
+                    if (v == null || v.isEmpty()) {
+                        continue;
+                    }
+
+                    if (LATEST_LISTED_ENTRY_TIMESTAMP_KEY.equals(k)) {
+                        minTimestampToListMillis = Long.parseLong(v);
+                        // If our determined timestamp is the same as that of 
our last listing, skip this execution as there are no updates
+                        if 
(minTimestampToListMillis.equals(this.lastListedLatestEntryTimestampMillis)) {
+                            context.yield();
+                            return;
+                        } else {
+                            this.lastListedLatestEntryTimestampMillis = 
minTimestampToListMillis;
+                        }
+                    } else if 
(LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY.equals(k)) {
+                        this.lastProcessedLatestEntryTimestampMillis = 
Long.parseLong(v);
+                    } else if (k.startsWith(IDENTIFIER_PREFIX)) {
+                        latestIdentifiersProcessed.add(v);
                     }
                 }
                 justElectedPrimaryNode = false;
@@ -405,7 +422,7 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
                 targetSystemHasSeconds = entityTimestampMillis % 60_000 > 0;
             }
             // New entries are all those that occur at or after the associated 
timestamp
-            final boolean newEntry = minTimestampToListMillis == null || 
entityTimestampMillis >= minTimestampToListMillis && entityTimestampMillis > 
lastProcessedLatestEntryTimestampMillis;
+            final boolean newEntry = minTimestampToListMillis == null || 
entityTimestampMillis >= minTimestampToListMillis && entityTimestampMillis >= 
lastProcessedLatestEntryTimestampMillis;
 
             if (newEntry) {
                 List<T> entitiesForTimestamp = 
orderedEntries.get(entity.getTimestamp());
@@ -439,7 +456,10 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
                  *   - The latest listed entity timestamp is equal to the last 
processed time, meaning we handled those items originally passed over. No need 
to process it again.
                  */
                 final long  listingLagNanos = 
TimeUnit.MILLISECONDS.toNanos(listingLagMillis);
-                if (currentRunTimeNanos - lastRunTimeNanos < listingLagNanos 
|| 
latestListedEntryTimestampThisCycleMillis.equals(lastProcessedLatestEntryTimestampMillis))
 {
+                if (currentRunTimeNanos - lastRunTimeNanos < listingLagNanos
+                        || 
(latestListedEntryTimestampThisCycleMillis.equals(lastProcessedLatestEntryTimestampMillis)
+                            && 
orderedEntries.get(latestListedEntryTimestampThisCycleMillis).stream()
+                                    .allMatch(entity -> 
latestIdentifiersProcessed.contains(entity.getIdentifier())))) {
                     context.yield();
                     return;
                 }
@@ -456,9 +476,14 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
                 }
             }
 
+            for (Map.Entry<Long, List<T>> timestampEntities : 
orderedEntries.entrySet()) {
+                List<T> entities = timestampEntities.getValue();
+                if 
(timestampEntities.getKey().equals(lastProcessedLatestEntryTimestampMillis)) {
+                    // Filter out previously processed entities.
+                    entities = entities.stream().filter(entity -> 
!latestIdentifiersProcessed.contains(entity.getIdentifier())).collect(Collectors.toList());
+                }
 
-            for (List<T> timestampEntities : orderedEntries.values()) {
-                for (T entity : timestampEntities) {
+                for (T entity : entities) {
                     // Create the FlowFile for this path.
                     final Map<String, String> attributes = 
createAttributes(entity, context);
                     FlowFile flowFile = session.create();
@@ -476,6 +501,13 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
                 // If there have been files created, update the last timestamp 
we processed.
                 // Retrieving lastKey instead of using 
latestListedEntryTimestampThisCycleMillis is intentional here,
                 // because latestListedEntryTimestampThisCycleMillis might be 
removed if it's not old enough.
+                if 
(!orderedEntries.lastKey().equals(lastProcessedLatestEntryTimestampMillis)) {
+                    // If the latest timestamp at this cycle becomes different 
than the previous one, we need to clear identifiers.
+                    // If it didn't change, we need to add identifiers.
+                    latestIdentifiersProcessed.clear();
+                }
+                // Capture latestIdentifierProcessed.
+                
latestIdentifiersProcessed.addAll(orderedEntries.lastEntry().getValue().stream().map(T::getIdentifier).collect(Collectors.toList()));
                 lastProcessedLatestEntryTimestampMillis = 
orderedEntries.lastKey();
                 getLogger().info("Successfully created listing with {} new 
objects", new Object[]{flowfilesCreated});
                 session.commit();
@@ -494,7 +526,7 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
                 // the distributed state cache, the node can continue to run 
(if it is primary node).
                 try {
                     lastListedLatestEntryTimestampMillis = 
latestListedEntryTimestampThisCycleMillis;
-                    persist(latestListedEntryTimestampThisCycleMillis, 
lastProcessedLatestEntryTimestampMillis, context.getStateManager(), 
getStateScope(context));
+                    persist(latestListedEntryTimestampThisCycleMillis, 
lastProcessedLatestEntryTimestampMillis, latestIdentifiersProcessed, 
context.getStateManager(), getStateScope(context));
                 } catch (final IOException ioe) {
                     getLogger().warn("Unable to save state due to {}. If NiFi 
is restarted before state is saved, or "
                         + "if another node begins executing this Processor, 
data duplication may occur.", ioe);
@@ -518,6 +550,7 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
         lastListedLatestEntryTimestampMillis = null;
         lastProcessedLatestEntryTimestampMillis = 0L;
         lastRunTimeNanos = 0L;
+        latestIdentifiersProcessed.clear();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/nifi/blob/e68ff153/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
index 69705f2..1ecbce7 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
@@ -17,6 +17,10 @@
 
 package org.apache.nifi.processor.util.list;
 
+import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.PRECISION_MILLIS;
+import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.PRECISION_MINUTES;
+import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.PRECISION_SECONDS;
+import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION;
 import static org.junit.Assert.assertEquals;
 
 import java.io.File;
@@ -130,6 +134,8 @@ public class TestAbstractListProcessor {
 
         final long initialTimestamp = 
getCurrentTimestampMillis(targetPrecision);
 
+        setTargetSystemTimestampPrecision(targetPrecision);
+
         
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
         proc.addEntity("name", "id", initialTimestamp);
         proc.addEntity("name", "id2", initialTimestamp);
@@ -182,6 +188,8 @@ public class TestAbstractListProcessor {
 
         final long initialTimestamp = 
getCurrentTimestampMillis(targetPrecision);
 
+        setTargetSystemTimestampPrecision(targetPrecision);
+
         
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
         proc.addEntity("name", "id", initialTimestamp);
         proc.addEntity("name", "id2", initialTimestamp);
@@ -226,6 +234,20 @@ public class TestAbstractListProcessor {
         runner.clearTransferState();
     }
 
+    private void setTargetSystemTimestampPrecision(TimeUnit targetPrecision) {
+        switch (targetPrecision) {
+            case MINUTES:
+                runner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, 
PRECISION_MINUTES);
+                break;
+            case SECONDS:
+                runner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, 
PRECISION_SECONDS);
+                break;
+            case MILLISECONDS:
+                runner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, 
PRECISION_MILLIS);
+                break;
+        }
+    }
+
     @Test
     public void testOnlyNewEntriesEmittedMillisPrecision() throws Exception {
         testOnlyNewEntriesEmitted(TimeUnit.MILLISECONDS);
@@ -257,6 +279,8 @@ public class TestAbstractListProcessor {
         final Map<String, String> preexistingState = new HashMap<>();
         
preexistingState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, 
Long.toString(initialTimestamp));
         
preexistingState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY,
 Long.toString(initialTimestamp));
+        preexistingState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".0", 
"id");
+        preexistingState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".1", 
"id2");
         runner.getStateManager().setState(preexistingState, Scope.CLUSTER);
 
         // run for the first time
@@ -324,6 +348,7 @@ public class TestAbstractListProcessor {
         // Ensure only timestamp is migrated
         
expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, 
String.valueOf(initialTimestamp));
         
expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY,
 String.valueOf(initialTimestamp));
+        expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".0", 
"id");
         runner.getStateManager().assertStateEquals(expectedState, 
Scope.CLUSTER);
     }
 
@@ -383,9 +408,10 @@ public class TestAbstractListProcessor {
 
         // Verify the state manager now maintains the associated state
         final Map<String, String> expectedState = new HashMap<>();
-        // Ensure only timestamp is migrated
+        // Ensure timestamp and identifies are migrated
         
expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, 
"1492");
         
expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY,
 "1492");
+        expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".0", 
"id");
         runner.getStateManager().assertStateEquals(expectedState, 
Scope.CLUSTER);
     }
 
@@ -462,10 +488,12 @@ public class TestAbstractListProcessor {
         assertEquals(2, stateMap.getVersion());
 
         final Map<String, String> map = stateMap.toMap();
-        // Ensure only timestamp is migrated
-        assertEquals(2, map.size());
+        // Ensure timestamp and identifiers are migrated
+        assertEquals(4, map.size());
         assertEquals(Long.toString(initialTimestamp), 
map.get(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY));
         assertEquals(Long.toString(initialTimestamp), 
map.get(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY));
+        assertEquals("id", map.get(AbstractListProcessor.IDENTIFIER_PREFIX + 
".0"));
+        assertEquals("id2", map.get(AbstractListProcessor.IDENTIFIER_PREFIX + 
".1"));
 
         proc.addEntity("new name", "new id", initialTimestamp + 1);
         runner.run();
@@ -476,10 +504,11 @@ public class TestAbstractListProcessor {
         StateMap updatedStateMap = 
runner.getStateManager().getState(Scope.CLUSTER);
         assertEquals(3, updatedStateMap.getVersion());
 
-        assertEquals(2, updatedStateMap.toMap().size());
+        assertEquals(3, updatedStateMap.toMap().size());
         assertEquals(Long.toString(initialTimestamp + 1), 
updatedStateMap.get(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY));
         // Processed timestamp is now caught up
         assertEquals(Long.toString(initialTimestamp + 1), 
updatedStateMap.get(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY));
+        assertEquals("new id", 
updatedStateMap.get(AbstractListProcessor.IDENTIFIER_PREFIX + ".0"));
     }
 
     private static class DistributedCache extends AbstractControllerService 
implements DistributedMapCacheClient {

http://git-wip-us.apache.org/repos/asf/nifi/blob/e68ff153/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java
index d8797dc..96a4236 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.standard;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.list.AbstractListProcessor;
 import org.apache.nifi.processors.standard.util.FTPTransfer;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.MockProcessContext;
@@ -41,6 +42,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 
@@ -201,7 +203,7 @@ public class TestFTP {
     }
 
     @Test
-    public void basicFileList() throws IOException {
+    public void basicFileList() throws IOException, InterruptedException {
         FileSystem results = fakeFtpServer.getFileSystem();
 
         FileEntry sampleFile = new FileEntry("c:\\data\\randombytes-2");
@@ -217,10 +219,16 @@ public class TestFTP {
         runner.setProperty(FTPTransfer.PASSWORD, password);
         runner.setProperty(FTPTransfer.PORT, Integer.toString(ftpPort));
         runner.setProperty(ListFTP.REMOTE_PATH, "/");
+        // FakeFTPServer has timestamp precision in minutes.
+        // Specify milliseconds precision so that test does not need to wait 
for minutes.
+        runner.setProperty(ListFile.TARGET_SYSTEM_TIMESTAMP_PRECISION, 
ListFile.PRECISION_MILLIS);
         runner.assertValid();
 
+        // Ensure wait for enough lag time.
+        
Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS.get(TimeUnit.MILLISECONDS)
 * 2);
         runner.run();
 
+        runner.assertTransferCount(FetchFTP.REL_SUCCESS, 1);
         final MockFlowFile retrievedFile = 
runner.getFlowFilesForRelationship(FetchFTP.REL_SUCCESS).get(0);
         runner.assertAllFlowFilesContainAttribute("ftp.remote.host");
         runner.assertAllFlowFilesContainAttribute("ftp.remote.port");

http://git-wip-us.apache.org/repos/asf/nifi/blob/e68ff153/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
index 1b5b2a4..bf2755b 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
@@ -33,8 +33,10 @@ import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -646,6 +648,44 @@ public class TestListFile {
         assertEquals(false, processor.isListingResetNecessary(new 
PropertyDescriptor.Builder().name("x").build()));
     }
 
+    private void makeTestFile(final String name, final long millis, final 
Map<String, Long> fileTimes) throws IOException {
+        final File file = new File(TESTDIR + name);
+        assertTrue(file.createNewFile());
+        assertTrue(file.setLastModified(millis));
+        fileTimes.put(file.getName(), file.lastModified());
+    }
+
+    @Test
+    public void testFilterRunMidFileWrites() throws Exception {
+        final Map<String, Long> fileTimes = new HashMap<>();
+
+        runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
+
+        makeTestFile("/batch1-age3.txt", time3millis, fileTimes);
+        makeTestFile("/batch1-age4.txt", time4millis, fileTimes);
+        makeTestFile("/batch1-age5.txt", time5millis, fileTimes);
+
+        // check files
+        runNext();
+
+        runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
+        runner.assertTransferCount(ListFile.REL_SUCCESS, 3);
+        assertEquals(3, 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS).size());
+
+        // should be picked since it's newer than age3
+        makeTestFile("/batch2-age2.txt", time2millis, fileTimes);
+        // should be picked even if it has the same age3 timestamp, because it 
wasn't there at the previous cycle.
+        makeTestFile("/batch2-age3.txt", time3millis, fileTimes);
+        // should be ignored since it's older than age3
+        makeTestFile("/batch2-age4.txt", time4millis, fileTimes);
+
+        runNext();
+
+        runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
+        runner.assertTransferCount(ListFile.REL_SUCCESS, 2);
+        assertEquals(2, 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS).size());
+    }
+
     /*
      * HFS+, default for OS X, only has granularity to one second, 
accordingly, we go back in time to establish consistent test cases
      *

Reply via email to