Repository: nifi
Updated Branches:
  refs/heads/master e4eda188b -> 095c04eda


NIFI-3213: ListFile do not skip obviously old files

Before this fix, files with the latest timestamp within a listing
iteration are always be held back one cycle no matter how old it is.

Signed-off-by: Andre F de Miranda <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/095c04ed
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/095c04ed
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/095c04ed

Branch: refs/heads/master
Commit: 095c04eda0c604a02c51df085ba67847448224c0
Parents: e4eda18
Author: Koji Kawamura <[email protected]>
Authored: Fri Dec 16 17:48:06 2016 +0900
Committer: Andre F de Miranda <[email protected]>
Committed: Sat Feb 18 00:56:46 2017 +1100

----------------------------------------------------------------------
 .../standard/AbstractListProcessor.java         |   4 +-
 .../standard/TestAbstractListProcessor.java     |  78 +++---
 .../nifi/processors/standard/TestListFile.java  | 254 ++++++-------------
 3 files changed, 110 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/095c04ed/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
index 9e1e1aa..eceee1d 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
@@ -336,6 +336,7 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
         }
 
         final List<T> entityList;
+        final long currentListingTimestamp = System.nanoTime();
         try {
             // track of when this last executed for consideration of the lag 
nanos
             entityList = performListing(context, minTimestamp);
@@ -385,7 +386,8 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
                     context.yield();
                     return;
                 }
-            } else {
+
+            } else if (latestListingTimestamp >= currentListingTimestamp - 
LISTING_LAG_NANOS) {
                 // Otherwise, newest entries are held back one cycle to avoid 
issues in writes occurring exactly when the listing is being performed to avoid 
missing data
                 orderedEntries.remove(latestListingTimestamp);
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/095c04ed/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
index f8b59f5..9896396 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
@@ -57,6 +57,30 @@ public class TestAbstractListProcessor {
     public final TemporaryFolder testFolder = new TemporaryFolder();
 
     @Test
+    public void testAllExistingEntriesEmittedOnFirstIteration() throws 
Exception {
+        final long oldTimestamp = System.nanoTime() - 
(AbstractListProcessor.LISTING_LAG_NANOS * 2);
+
+        // These entries have existed before the processor runs at the first 
time.
+        final ConcreteListProcessor proc = new ConcreteListProcessor();
+        proc.addEntity("name", "id", oldTimestamp);
+        proc.addEntity("name", "id2", oldTimestamp);
+
+        // First run, the above listed entries should be emitted since it has 
existed.
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
+        runner.clearTransferState();
+
+        // Ensure we have covered the necessary lag period to avoid issues 
where the processor was immediately scheduled to run again
+        Thread.sleep(DEFAULT_SLEEP_MILLIS);
+
+        // Run again without introducing any new entries
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+    }
+
+    @Test
     public void testPreviouslySkippedEntriesEmittedOnNextIteration() throws 
Exception {
         final ConcreteListProcessor proc = new ConcreteListProcessor();
         final TestRunner runner = TestRunners.newTestRunner(proc);
@@ -71,6 +95,7 @@ public class TestAbstractListProcessor {
 
         // First run, the above listed entries would be skipped to avoid write 
synchronization issues
         
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
 
         // Ensure we have covered the necessary lag period to avoid issues 
where the processor was immediately scheduled to run again
         Thread.sleep(DEFAULT_SLEEP_MILLIS);
@@ -124,14 +149,7 @@ public class TestAbstractListProcessor {
         // Now a new file beyond the current time enters
         proc.addEntity("name", "id2", initialTimestamp + 1);
 
-        // Nothing occurs for the first iteration as it is withheld
-        runner.run();
-        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
-
-        // But it should now show up that the appropriate pause has been 
eclipsed
+        // It should show up
         runner.run();
         
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
         runner.clearTransferState();
@@ -187,14 +205,7 @@ public class TestAbstractListProcessor {
         // Now a new file beyond the current time enters
         proc.addEntity("name", "id2", initialTimestamp + 1);
 
-        // Nothing occurs for the first iteration as it is withheld
-        runner.run();
-        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
-
-        // But it should now show up that the appropriate pause has been 
eclipsed
+        // It should now show up
         runner.run();
         
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
         runner.clearTransferState();
@@ -209,14 +220,14 @@ public class TestAbstractListProcessor {
         runner.enableControllerService(cache);
         runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, 
"cache");
 
-        runner.run();
+        final long initialTimestamp = System.nanoTime();
 
-        proc.addEntity("name", "id", 1492L);
+        proc.addEntity("name", "id", initialTimestamp);
         runner.run();
 
         final Map<String, String> expectedState = new HashMap<>();
         // Ensure only timestamp is migrated
-        expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, "1492");
+        expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, 
String.valueOf(initialTimestamp));
         expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "0");
         runner.getStateManager().assertStateEquals(expectedState, 
Scope.CLUSTER);
 
@@ -224,8 +235,8 @@ public class TestAbstractListProcessor {
 
         runner.run();
         // Ensure only timestamp is migrated
-        expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, "1492");
-        expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, 
"1492");
+        expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, 
String.valueOf(initialTimestamp));
+        expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, 
String.valueOf(initialTimestamp));
         runner.getStateManager().assertStateEquals(expectedState, 
Scope.CLUSTER);
     }
 
@@ -328,14 +339,6 @@ public class TestAbstractListProcessor {
         runner.getStateManager().clear(Scope.CLUSTER);
         Assert.assertEquals("State is not empty for this component after 
clearing", 0, runner.getStateManager().getState(Scope.CLUSTER).toMap().size());
 
-
-        // As before, we are unsure of when these files were delivered 
relative to system time, and additional cycle(s) need to occur before transfer
-        runner.run();
-        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
-
         // Ensure the original files are now transferred again.
         runner.run();
         
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
@@ -390,8 +393,7 @@ public class TestAbstractListProcessor {
         proc.addEntity("new name", "new id", initialTimestamp + 1);
         runner.run();
 
-        // Verify that the new entry has not been emitted but it has triggered 
an updated state
-        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
         runner.clearTransferState();
 
         StateMap updatedStateMap = 
runner.getStateManager().getState(Scope.CLUSTER);
@@ -399,20 +401,6 @@ public class TestAbstractListProcessor {
 
         assertEquals(2, updatedStateMap.toMap().size());
         assertEquals(Long.toString(initialTimestamp + 1), 
updatedStateMap.get(AbstractListProcessor.LISTING_TIMESTAMP_KEY));
-        // Processed timestamp is lagging behind currently
-        assertEquals(Long.toString(initialTimestamp), 
updatedStateMap.get(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY));
-
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
-
-        runner.run();
-        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
-        runner.clearTransferState();
-
-        updatedStateMap = runner.getStateManager().getState(Scope.CLUSTER);
-        assertEquals(4, updatedStateMap.getVersion());
-
-        assertEquals(2, updatedStateMap.toMap().size());
-        assertEquals(Long.toString(initialTimestamp + 1), 
updatedStateMap.get(AbstractListProcessor.LISTING_TIMESTAMP_KEY));
         // Processed timestamp is now caught up
         assertEquals(Long.toString(initialTimestamp + 1), 
updatedStateMap.get(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY));
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/095c04ed/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
index a3e5a94..951aab0 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
@@ -34,6 +34,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -89,6 +90,15 @@ public class TestListFile {
         }
     }
 
+    /**
+     * This method ensures runner clears transfer state,
+     * and sleeps the current thread for DEFAULT_SLEEP_MILLIS before executing 
runner.run().
+     */
+    private void runNext() throws InterruptedException {
+        runner.clearTransferState();
+        Thread.sleep(DEFAULT_SLEEP_MILLIS);
+        runner.run();
+    }
 
     @Test
     public void testGetRelationships() throws Exception {
@@ -107,21 +117,18 @@ public class TestListFile {
 
     @Test
     public void testPerformListing() throws Exception {
+
+        runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
+        runNext();
+        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
+
         // create first file
         final File file1 = new File(TESTDIR + "/listing1.txt");
         assertTrue(file1.createNewFile());
         assertTrue(file1.setLastModified(time4millis));
 
         // process first file and set new timestamp
-        runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
-        runner.run();
-
-        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
-
-        runner.run();
+        runNext();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles1 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         assertEquals(1, successFiles1.size());
@@ -132,14 +139,7 @@ public class TestListFile {
         assertTrue(file2.setLastModified(time2millis));
 
         // process second file after timestamp
-        runner.clearTransferState();
-        runner.run();
-        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
-
-        runner.run();
+        runNext();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles2 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         assertEquals(1, successFiles2.size());
@@ -150,95 +150,88 @@ public class TestListFile {
         assertTrue(file3.setLastModified(time4millis));
 
         // process third file before timestamp
-        runner.clearTransferState();
-        runner.run();
+        runNext();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles3 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         assertEquals(0, successFiles3.size());
 
         // force state to reset and process all files
-        runner.clearTransferState();
         runner.removeProperty(ListFile.DIRECTORY);
         runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
-        runner.run();
+        runNext();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles4 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
-        assertEquals(2, successFiles4.size());
-
-        runner.clearTransferState();
+        assertEquals(3, successFiles4.size());
 
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
-
-        runner.run();
-        runner.assertTransferCount(ListFile.REL_SUCCESS, 1);
+        runNext();
+        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
     }
 
     @Test
     public void testFilterAge() throws Exception {
+
         final File file1 = new File(TESTDIR + "/age1.txt");
         assertTrue(file1.createNewFile());
-        assertTrue(file1.setLastModified(time0millis));
 
         final File file2 = new File(TESTDIR + "/age2.txt");
         assertTrue(file2.createNewFile());
-        assertTrue(file2.setLastModified(time2millis));
 
         final File file3 = new File(TESTDIR + "/age3.txt");
         assertTrue(file3.createNewFile());
-        assertTrue(file3.setLastModified(time4millis));
+
+        final Function<Boolean, Object> runNext = resetAges -> {
+            if (resetAges) {
+                resetAges();
+                assertTrue(file1.setLastModified(time0millis));
+                assertTrue(file2.setLastModified(time2millis));
+                assertTrue(file3.setLastModified(time4millis));
+            }
+            try {
+                runNext();
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+            return null;
+        };
 
         // check all files
         runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
-        runner.run();
-        runner.assertTransferCount(ListFile.REL_SUCCESS, 2);
-        runner.clearTransferState();
-
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
+        runNext.apply(true);
+        runner.assertTransferCount(ListFile.REL_SUCCESS, 3);
 
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
-        final List<MockFlowFile> successFiles1 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
-        assertEquals(1, successFiles1.size());
+        // processor updates internal state, it shouldn't pick the same ones.
+        runNext.apply(false);
+        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
 
         // exclude oldest
-        runner.clearTransferState();
         runner.setProperty(ListFile.MIN_AGE, age0);
         runner.setProperty(ListFile.MAX_AGE, age3);
-        runner.run();
+        runNext.apply(true);
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles2 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
-        assertEquals(1, successFiles2.size());
-        runner.clearTransferState();
-
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
-
-        runner.run();
-        runner.assertTransferCount(ListFile.REL_SUCCESS, 1);
+        assertEquals(2, successFiles2.size());
+        assertEquals(file2.getName(), 
successFiles2.get(0).getAttribute("filename"));
+        assertEquals(file1.getName(), 
successFiles2.get(1).getAttribute("filename"));
 
         // exclude newest
-        runner.clearTransferState();
         runner.setProperty(ListFile.MIN_AGE, age1);
         runner.setProperty(ListFile.MAX_AGE, age5);
-        runner.run();
+        runNext.apply(true);
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles3 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
-        assertEquals(1, successFiles3.size());
+        assertEquals(2, successFiles3.size());
+        assertEquals(file3.getName(), 
successFiles3.get(0).getAttribute("filename"));
+        assertEquals(file2.getName(), 
successFiles3.get(1).getAttribute("filename"));
 
         // exclude oldest and newest
-        runner.clearTransferState();
         runner.setProperty(ListFile.MIN_AGE, age1);
         runner.setProperty(ListFile.MAX_AGE, age3);
-        runner.run();
+        runNext.apply(true);
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles4 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
-        assertEquals(0, successFiles4.size());
-        runner.clearTransferState();
-
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
+        assertEquals(1, successFiles4.size());
+        assertEquals(file2.getName(), 
successFiles4.get(0).getAttribute("filename"));
 
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
-        runner.assertTransferCount(ListFile.REL_SUCCESS, 1);
     }
 
     @Test
@@ -273,68 +266,37 @@ public class TestListFile {
 
         // check all files
         runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
-        runner.run();
-        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
-
-        runner.run();
-
+        runNext();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles1 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         assertEquals(3, successFiles1.size());
 
         // exclude largest
-        runner.clearTransferState();
         runner.removeProperty(ListFile.MIN_AGE);
         runner.removeProperty(ListFile.MAX_AGE);
         runner.setProperty(ListFile.MIN_SIZE, "0 b");
         runner.setProperty(ListFile.MAX_SIZE, "7500 b");
-        runner.run();
-
-        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
-
-        runner.run();
-
+        runNext();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles2 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         assertEquals(2, successFiles2.size());
 
         // exclude smallest
-        runner.clearTransferState();
         runner.removeProperty(ListFile.MIN_AGE);
         runner.removeProperty(ListFile.MAX_AGE);
         runner.setProperty(ListFile.MIN_SIZE, "2500 b");
         runner.removeProperty(ListFile.MAX_SIZE);
-        runner.run();
-        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
-
-        runner.run();
+        runNext();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles3 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         assertEquals(2, successFiles3.size());
 
         // exclude oldest and newest
-        runner.clearTransferState();
         runner.removeProperty(ListFile.MIN_AGE);
         runner.removeProperty(ListFile.MAX_AGE);
         runner.setProperty(ListFile.MIN_SIZE, "2500 b");
         runner.setProperty(ListFile.MAX_SIZE, "7500 b");
-        runner.run();
-        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
-
-        runner.run();
-
+        runNext();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles4 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         assertEquals(1, successFiles4.size());
@@ -364,7 +326,6 @@ public class TestListFile {
         assertTrue(file2.setLastModified(now));
 
         // check all files
-        runner.clearTransferState();
         runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
         runner.setProperty(ListFile.FILE_FILTER, ".*");
         runner.removeProperty(ListFile.MIN_AGE);
@@ -372,25 +333,14 @@ public class TestListFile {
         runner.removeProperty(ListFile.MIN_SIZE);
         runner.removeProperty(ListFile.MAX_SIZE);
         runner.setProperty(ListFile.IGNORE_HIDDEN_FILES, "false");
-        runner.run();
-        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
-
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
-
-        runner.run();
+        runNext();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles1 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         assertEquals(2, successFiles1.size());
 
         // exclude hidden
-        runner.clearTransferState();
         runner.setProperty(ListFile.IGNORE_HIDDEN_FILES, "true");
-        runner.run();
-        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
-
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
-
-        runner.run();
+        runNext();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles2 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         assertEquals(1, successFiles2.size());
@@ -398,6 +348,7 @@ public class TestListFile {
 
     @Test
     public void testFilterFilePattern() throws Exception {
+
         final long now = getTestModifiedTime();
 
         final File file1 = new File(TESTDIR + "/file1-abc-apple.txt");
@@ -417,31 +368,21 @@ public class TestListFile {
         assertTrue(file4.setLastModified(now));
 
         // check all files
-        runner.clearTransferState();
         runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
         runner.setProperty(ListFile.FILE_FILTER, 
ListFile.FILE_FILTER.getDefaultValue());
-        runner.run();
-        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
-
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
-
-        runner.run();
+        runNext();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles1 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         assertEquals(4, successFiles1.size());
 
         // filter file on pattern
-        runner.clearTransferState();
+        // Modifying FILE_FILTER property reset listing status, so these files 
will be listed again.
         runner.setProperty(ListFile.FILE_FILTER, ".*-xyz-.*");
-        runner.run();
-        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
-
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
+        runNext();
+        runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS, 2);
 
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
-        final List<MockFlowFile> successFiles2 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
-        assertEquals(2, successFiles2.size());
+        runNext();
+        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
     }
 
     @Test
@@ -474,40 +415,24 @@ public class TestListFile {
         runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
         runner.setProperty(ListFile.FILE_FILTER, 
ListFile.FILE_FILTER.getDefaultValue());
         runner.setProperty(ListFile.RECURSE, "true");
-        runner.run();
-        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
-
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
+        runNext();
 
-        runner.run();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles1 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         assertEquals(4, successFiles1.size());
 
         // filter path on pattern subdir1
-        runner.clearTransferState();
         runner.setProperty(ListFile.PATH_FILTER, "subdir1");
         runner.setProperty(ListFile.RECURSE, "true");
-        runner.run();
-        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
-
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
-
-        runner.run();
+        runNext();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles2 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         assertEquals(3, successFiles2.size());
 
         // filter path on pattern subdir2
-        runner.clearTransferState();
         runner.setProperty(ListFile.PATH_FILTER, "subdir2");
         runner.setProperty(ListFile.RECURSE, "true");
-        runner.run();
-        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
-
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
-
-        runner.run();
+        runNext();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles3 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         assertEquals(1, successFiles3.size());
@@ -536,16 +461,9 @@ public class TestListFile {
         assertTrue(file3.setLastModified(now));
 
         // check all files
-        runner.clearTransferState();
         runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
         runner.setProperty(ListFile.RECURSE, "true");
-        runner.run();
-        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
-
-        runner.run();
+        runNext();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS, 3);
         final List<MockFlowFile> successFiles1 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         for (final MockFlowFile mff : successFiles1) {
@@ -570,17 +488,8 @@ public class TestListFile {
         assertEquals(3, successFiles1.size());
 
         // exclude hidden
-        runner.clearTransferState();
         runner.setProperty(ListFile.RECURSE, "false");
-        runner.run();
-
-        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
-
-        runner.run();
-
+        runNext();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles2 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         assertEquals(1, successFiles2.size());
@@ -603,16 +512,9 @@ public class TestListFile {
         assertTrue(file3.setLastModified(now));
 
         // check all files
-        runner.clearTransferState();
         runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
         runner.setProperty(ListFile.RECURSE, "true");
-        runner.run();
-        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
-
-        runner.run();
+        runNext();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         runner.assertTransferCount(ListFile.REL_SUCCESS, 3);
     }
@@ -630,16 +532,8 @@ public class TestListFile {
         String userName = System.getProperty("user.name");
 
         // validate the file transferred
-        runner.clearTransferState();
         runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
-        runner.run();
-        runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
-
-        runner.run();
-
+        runNext();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles1 = 
runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         assertEquals(1, successFiles1.size());

Reply via email to