Repository: nifi
Updated Branches:
  refs/heads/master 8c488d7e8 -> 56f79e1e8


NIFI-1588: Reworked how ListHDFS store state so that only 2 timestamps are 
stored and no paths

Signed-off-by: joewitt <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/56f79e1e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/56f79e1e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/56f79e1e

Branch: refs/heads/master
Commit: 56f79e1e8560ba9343b4da072673fcfe894d156b
Parents: 8c488d7
Author: Mark Payne <[email protected]>
Authored: Thu Mar 10 13:44:23 2016 -0500
Committer: joewitt <[email protected]>
Committed: Mon Mar 14 15:59:24 2016 -0400

----------------------------------------------------------------------
 .../apache/nifi/processors/hadoop/ListHDFS.java | 274 +++++++++++++------
 .../nifi/processors/hadoop/TestListHDFS.java    |  87 ++++--
 .../standard/TestAbstractListProcessor.java     |  24 ++
 3 files changed, 278 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/56f79e1e/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index b300b88..2100f48 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -19,12 +19,14 @@ package org.apache.nifi.processors.hadoop;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Date;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -57,6 +59,7 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.hadoop.util.HDFSListing;
 import org.apache.nifi.processors.hadoop.util.StringSerDe;
+import org.apache.nifi.processors.hadoop.util.HDFSListing.StateKeys;
 import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.JsonParseException;
 import org.codehaus.jackson.map.JsonMappingException;
@@ -119,8 +122,15 @@ public class ListHDFS extends AbstractHadoopProcessor {
         .description("All FlowFiles are transferred to this relationship")
         .build();
 
-    private volatile Long lastListingTime = null;
-    private volatile Set<Path> latestPathsListed = new HashSet<>();
+    private volatile long latestTimestampListed = -1L;
+    private volatile long latestTimestampEmitted = -1L;
+    private volatile boolean electedPrimaryNodeSinceLastIteration = false;
+    private volatile long lastRunTimestamp = -1L;
+
+    static final String LISTING_TIMESTAMP_KEY = "listing.timestamp";
+    static final String EMITTED_TIMESTAMP_KEY = "emitted.timestamp";
+
+    static final long LISTING_LAG_NANOS = TimeUnit.MILLISECONDS.toNanos(100L);
 
     @Override
     protected void init(final ProcessorInitializationContext context) {
@@ -155,12 +165,11 @@ public class ListHDFS extends AbstractHadoopProcessor {
         return getIdentifier() + ".lastListingTime." + directory;
     }
 
-
     @Override
     public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
         if (isConfigurationRestored() && descriptor.equals(DIRECTORY)) {
-            lastListingTime = null; // clear lastListingTime so that we have 
to fetch new time
-            latestPathsListed = new HashSet<>();
+            latestTimestampEmitted = -1L;
+            latestTimestampListed = -1L;
         }
     }
 
@@ -170,6 +179,14 @@ public class ListHDFS extends AbstractHadoopProcessor {
         return mapper.readValue(jsonNode, HDFSListing.class);
     }
 
+    /**
+     * Transitions state from the Distributed cache service to the state 
manager. This will be
+     * removed in NiFi 1.x
+     *
+     * @param context the ProcessContext
+     * @throws IOException if unable to communicate with state manager or 
controller service
+     */
+    @Deprecated
     @OnScheduled
     public void moveStateToStateManager(final ProcessContext context) throws 
IOException {
         final StateManager stateManager = context.getStateManager();
@@ -184,6 +201,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
         }
     }
 
+    @Deprecated
     private HDFSListing getListingFromService(final ProcessContext context) 
throws IOException {
         final DistributedMapCacheClient client = 
context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
         if (client == null) {
@@ -204,28 +222,149 @@ public class ListHDFS extends AbstractHadoopProcessor {
         }
     }
 
+    /**
+     * Restores state information from the 'old' style of storing state. This 
is deprecated and will no longer be supported
+     * in the 1.x NiFi baseline
+     *
+     * @param directory the directory that the listing was performed against
+     * @param remoteListing the remote listing
+     * @return the minimum timestamp that should be used for new entries
+     */
+    @Deprecated
+    private Long restoreTimestampFromOldStateFormat(final String directory, 
final HDFSListing remoteListing) {
+        // No cluster-wide state has been recovered. Just use whatever values 
we already have.
+        if (remoteListing == null) {
+            return latestTimestampListed;
+        }
+
+        // If our local timestamp is already later than the remote listing's 
timestamp, use our local info.
+        Long minTimestamp = latestTimestampListed;
+        if (minTimestamp != null && minTimestamp > 
remoteListing.getLatestTimestamp().getTime()) {
+            return minTimestamp;
+        }
+
+        // Use the remote listing's information.
+        if (minTimestamp == null || electedPrimaryNodeSinceLastIteration) {
+            this.latestTimestampListed = 
remoteListing.getLatestTimestamp().getTime();
+            this.latestTimestampEmitted = this.latestTimestampListed;
+        }
+
+        return minTimestamp;
+    }
+
+
+    /**
+     * Determines which of the given FileStatus's describes a File that should 
be listed.
+     *
+     * @param statuses the eligible FileStatus objects that we could 
potentially list
+     * @return a Set containing only those FileStatus objects that we want to 
list
+     */
+    Set<FileStatus> determineListable(final Set<FileStatus> statuses) {
+        final long minTimestamp = this.latestTimestampListed;
+        final TreeMap<Long, List<FileStatus>> orderedEntries = new TreeMap<>();
+
+        // Build a sorted map to determine the latest possible entries
+        for (final FileStatus status : statuses) {
+            if (status.getPath().getName().endsWith("_COPYING_")) {
+                continue;
+            }
+
+            final long entityTimestamp = status.getModificationTime();
+
+            if (entityTimestamp > latestTimestampListed) {
+                latestTimestampListed = entityTimestamp;
+            }
+
+            // New entries are all those that occur at or after the associated 
timestamp
+            final boolean newEntry = entityTimestamp >= minTimestamp && 
entityTimestamp > latestTimestampEmitted;
+
+            if (newEntry) {
+                List<FileStatus> entitiesForTimestamp = 
orderedEntries.get(status.getModificationTime());
+                if (entitiesForTimestamp == null) {
+                    entitiesForTimestamp = new ArrayList<FileStatus>();
+                    orderedEntries.put(status.getModificationTime(), 
entitiesForTimestamp);
+                }
+                entitiesForTimestamp.add(status);
+            }
+        }
+
+        final Set<FileStatus> toList = new HashSet<>();
+
+        if (orderedEntries.size() > 0) {
+            long latestListingTimestamp = orderedEntries.lastKey();
+
+            // If the last listing time is equal to the newest entries 
previously seen,
+            // another iteration has occurred without new files and special 
handling is needed to avoid starvation
+            if (latestListingTimestamp == minTimestamp) {
+                // We are done if the latest listing timestamp is equal to the 
last processed time,
+                // meaning we handled those items originally passed over
+                if (latestListingTimestamp == latestTimestampEmitted) {
+                    return Collections.emptySet();
+                }
+            } else {
+                // Otherwise, newest entries are held back one cycle to avoid 
issues in writes occurring exactly when the listing is being performed to avoid 
missing data
+                orderedEntries.remove(latestListingTimestamp);
+            }
+
+            for (List<FileStatus> timestampEntities : orderedEntries.values()) 
{
+                for (FileStatus status : timestampEntities) {
+                    toList.add(status);
+                }
+            }
+        }
+
+        return toList;
+    }
+
+
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        // We have to ensure that we don't continually perform listings, 
because if we perform two listings within
+        // the same millisecond, our algorithm for comparing timestamps will 
not work. So we ensure here that we do
+        // not let that happen.
+        final long now = System.nanoTime();
+        if (now - lastRunTimestamp < LISTING_LAG_NANOS) {
+            lastRunTimestamp = now;
+            context.yield();
+            return;
+        }
+        lastRunTimestamp = now;
+
+        final String directory = context.getProperty(DIRECTORY).getValue();
+
         // Ensure that we are using the latest listing information before we 
try to perform a listing of HDFS files.
-        Long minTimestamp = null;
         try {
-            final HDFSListing stateListing;
             final StateMap stateMap = 
context.getStateManager().getState(Scope.CLUSTER);
             if (stateMap.getVersion() == -1L) {
-                stateListing = null;
-                latestPathsListed = new HashSet<>();
-                lastListingTime = null;
+                latestTimestampEmitted = -1L;
+                latestTimestampListed = -1L;
+                getLogger().debug("Found no state stored");
             } else {
-                final Map<String, String> stateValues = stateMap.toMap();
-                stateListing = HDFSListing.fromMap(stateValues);
+                // Determine if state is stored in the 'new' format or the 
'old' format
+                final String emittedString = 
stateMap.get(EMITTED_TIMESTAMP_KEY);
+                if (emittedString == null && stateMap.get(StateKeys.TIMESTAMP) 
!= null) {
+                    // state is stored in the old format with XML
+                    final Map<String, String> stateValues = stateMap.toMap();
+                    final HDFSListing stateListing = 
HDFSListing.fromMap(stateValues);
+                    getLogger().debug("Found old-style state stored");
+                    restoreTimestampFromOldStateFormat(directory, 
stateListing);
+                } else if (emittedString == null) {
+                    latestTimestampEmitted = -1L;
+                    latestTimestampListed = -1L;
+                    getLogger().debug("Found no recognized state keys; 
assuming no relevant state and resetting listing/emitted time to -1");
+                } else {
+                    // state is stored in the new format, using just two 
timestamps
+                    latestTimestampEmitted = Long.parseLong(emittedString);
+                    final String listingTimestmapString = 
stateMap.get(LISTING_TIMESTAMP_KEY);
+                    if (listingTimestmapString != null) {
+                        latestTimestampListed = 
Long.parseLong(listingTimestmapString);
+                    }
 
-                if (stateListing != null) {
-                    latestPathsListed = stateListing.toPaths();
-                    lastListingTime = minTimestamp = 
stateListing.getLatestTimestamp().getTime();
+                    getLogger().debug("Found new-style state stored, latesting 
timestamp emitted = {}, latest listed = {}",
+                        new Object[] {latestTimestampEmitted, 
latestTimestampListed});
                 }
             }
-
         } catch (final IOException ioe) {
             getLogger().error("Failed to retrieve timestamp of last listing 
from Distributed Cache Service. Will not perform listing until this is 
accomplished.");
             context.yield();
@@ -234,80 +373,51 @@ public class ListHDFS extends AbstractHadoopProcessor {
 
         // Pull in any file that is newer than the timestamp that we have.
         final FileSystem hdfs = getFileSystem();
-        final String directory = context.getProperty(DIRECTORY).getValue();
         final boolean recursive = 
context.getProperty(RECURSE_SUBDIRS).asBoolean();
         final Path rootPath = new Path(directory);
 
-        int listCount = 0;
-        Long latestListingModTime = null;
         final Set<FileStatus> statuses;
         try {
             statuses = getStatuses(rootPath, recursive, hdfs);
-            for ( final FileStatus status : statuses ) {
-                // don't get anything where the last modified timestamp is 
equal to our current timestamp.
-                // if we do, then we run the risk of multiple files having the 
same last mod date but us only
-                // seeing a portion of them.
-                // I.e., there could be 5 files with last mod date = (now). 
But if we do the listing now, maybe
-                // only 2 exist and 3 more will exist later in this 
millisecond. So we ignore anything with a
-                // modified date not before the current time.
-                final long fileModTime = status.getModificationTime();
-
-                // we only want the file if its timestamp is later than the 
minTimestamp or equal to and we didn't pull it last time.
-                // Also, HDFS creates files with the suffix _COPYING_ when 
they are being written - we want to ignore those.
-                boolean fetch = 
!status.getPath().getName().endsWith("_COPYING_")
-                        && (minTimestamp == null || fileModTime > minTimestamp 
|| (fileModTime == minTimestamp && 
!latestPathsListed.contains(status.getPath())));
-
-                // Create the FlowFile for this path.
-                if ( fetch ) {
-                    final Map<String, String> attributes = 
createAttributes(status);
-                    FlowFile flowFile = session.create();
-                    flowFile = session.putAllAttributes(flowFile, attributes);
-                    session.transfer(flowFile, REL_SUCCESS);
-                    listCount++;
-
-                    if ( latestListingModTime == null || fileModTime > 
latestListingModTime ) {
-                        latestListingModTime = fileModTime;
-                    }
-                }
-            }
+            getLogger().debug("Found a total of {} files in HDFS", new 
Object[] {statuses.size()});
         } catch (final IOException ioe) {
             getLogger().error("Failed to perform listing of HDFS due to {}", 
new Object[] {ioe});
             return;
         }
 
-        if ( listCount > 0 ) {
-            getLogger().info("Successfully created listing with {} new files 
from HDFS", new Object[] {listCount});
-            session.commit();
+        final Set<FileStatus> listable = determineListable(statuses);
+        getLogger().debug("Of the {} files found in HDFS, {} are listable", 
new Object[] {statuses.size(), listable.size()});
 
-            // We have performed a listing and pushed the FlowFiles out.
-            // Now, we need to persist state about the Last Modified timestamp 
of the newest file
-            // that we pulled in. We do this in order to avoid pulling in the 
same file twice.
-            // However, we want to save the state both locally and remotely.
-            // We store the state remotely so that if a new Primary Node is 
chosen, it can pick up where the
-            // previously Primary Node left off.
-            final HDFSListing latestListing = 
createListing(latestListingModTime, statuses);
-
-            try {
-                context.getStateManager().setState(latestListing.toMap(), 
Scope.CLUSTER);
-            } catch (final IOException ioe) {
-                getLogger().warn("Failed to save cluster-wide state. If NiFi 
is restarted, data duplication may occur", ioe);
-            }
+        for (final FileStatus status : listable) {
+            final Map<String, String> attributes = createAttributes(status);
+            FlowFile flowFile = session.create();
+            flowFile = session.putAllAttributes(flowFile, attributes);
+            session.transfer(flowFile, REL_SUCCESS);
 
-            lastListingTime = latestListingModTime;
-            latestPathsListed.clear();
-            for (final FileStatus status : statuses) {
-                latestPathsListed.add(status.getPath());
+            final long fileModTime = status.getModificationTime();
+            if (fileModTime > latestTimestampEmitted) {
+                latestTimestampEmitted = fileModTime;
             }
+        }
+
+        final int listCount = listable.size();
+        if ( listCount > 0 ) {
+            getLogger().info("Successfully created listing with {} new files 
from HDFS", new Object[] {listCount});
+            session.commit();
         } else {
             getLogger().debug("There is no data to list. Yielding.");
             context.yield();
+        }
 
-            // lastListingTime = 0 so that we don't continually poll the 
distributed cache / local file system
-            if ( lastListingTime == null ) {
-                lastListingTime = 0L;
-            }
+        final Map<String, String> updatedState = new HashMap<>(1);
+        updatedState.put(LISTING_TIMESTAMP_KEY, 
String.valueOf(latestTimestampListed));
+        updatedState.put(EMITTED_TIMESTAMP_KEY, 
String.valueOf(latestTimestampEmitted));
+        getLogger().debug("New state map: {}", new Object[] {updatedState});
 
-            return;
+        try {
+            context.getStateManager().setState(updatedState, Scope.CLUSTER);
+        } catch (final IOException ioe) {
+            getLogger().warn("Failed to save cluster-wide state. If NiFi is 
restarted, data duplication may occur", ioe);
         }
     }
 
@@ -334,20 +444,6 @@ public class ListHDFS extends AbstractHadoopProcessor {
         return statusSet;
     }
 
-    private HDFSListing createListing(final long latestListingModTime, final 
Set<FileStatus> statuses) {
-        final Set<String> paths = new HashSet<>();
-        for (final FileStatus status : statuses) {
-            final String path = status.getPath().toUri().toString();
-            paths.add(path);
-        }
-
-        final HDFSListing listing = new HDFSListing();
-        listing.setLatestTimestamp(new Date(latestListingModTime));
-        listing.setMatchingPaths(paths);
-
-        return listing;
-    }
-
     private String getAbsolutePath(final Path path) {
         final Path parent = path.getParent();
         final String prefix = (parent == null || parent.getName().equals("")) 
? "" : getAbsolutePath(parent);
@@ -373,19 +469,19 @@ public class ListHDFS extends AbstractHadoopProcessor {
 
     private String getPerms(final FsAction action) {
         final StringBuilder sb = new StringBuilder();
-        if ( action.implies(FsAction.READ) ) {
+        if (action.implies(FsAction.READ)) {
             sb.append("r");
         } else {
             sb.append("-");
         }
 
-        if ( action.implies(FsAction.WRITE) ) {
+        if (action.implies(FsAction.WRITE)) {
             sb.append("w");
         } else {
             sb.append("-");
         }
 
-        if ( action.implies(FsAction.EXECUTE) ) {
+        if (action.implies(FsAction.EXECUTE)) {
             sb.append("x");
         } else {
             sb.append("-");

http://git-wip-us.apache.org/repos/asf/nifi/blob/56f79e1e/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
index 6c944c8..7a77f06 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
@@ -17,8 +17,6 @@
 package org.apache.nifi.processors.hadoop;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -31,6 +29,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -74,9 +73,14 @@ public class TestListHDFS {
     }
 
     @Test
-    public void testListingHasCorrectAttributes() {
+    public void testListingHasCorrectAttributes() throws InterruptedException {
         proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testFile.txt")));
 
+        // first iteration will not pick up files because it has to instead 
check timestamps.
+        // We must then wait long enough to ensure that the listing can be 
performed safely and
+        // run the Processor again.
+        runner.run();
+        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
         runner.run();
 
         runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
@@ -87,12 +91,17 @@ public class TestListHDFS {
 
 
     @Test
-    public void testRecursive() {
+    public void testRecursive() throws InterruptedException {
         proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testFile.txt")));
 
         proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
         proc.fileSystem.addFileStatus(new Path("/test/testDir"), new 
FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testDir/1.txt")));
 
+        // first iteration will not pick up files because it has to instead 
check timestamps.
+        // We must then wait long enough to ensure that the listing can be 
performed safely and
+        // run the Processor again.
+        runner.run();
+        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
         runner.run();
 
         runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
@@ -113,13 +122,18 @@ public class TestListHDFS {
     }
 
     @Test
-    public void testNotRecursive() {
+    public void testNotRecursive() throws InterruptedException {
         runner.setProperty(ListHDFS.RECURSE_SUBDIRS, "false");
         proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testFile.txt")));
 
         proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
         proc.fileSystem.addFileStatus(new Path("/test/testDir"), new 
FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testDir/1.txt")));
 
+        // first iteration will not pick up files because it has to instead 
check timestamps.
+        // We must then wait long enough to ensure that the listing can be 
performed safely and
+        // run the Processor again.
+        runner.run();
+        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
         runner.run();
 
         runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
@@ -131,9 +145,14 @@ public class TestListHDFS {
 
 
     @Test
-    public void testNoListUntilUpdateFromRemoteOnPrimaryNodeChange() throws 
IOException {
+    public void testNoListUntilUpdateFromRemoteOnPrimaryNodeChange() throws 
IOException, InterruptedException {
         proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 1999L, 0L, create777(), "owner", "group", new 
Path("/test/testFile.txt")));
 
+        // first iteration will not pick up files because it has to instead 
check timestamps.
+        // We must then wait long enough to ensure that the listing can be 
performed safely and
+        // run the Processor again.
+        runner.run();
+        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
         runner.run();
 
         runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
@@ -145,7 +164,7 @@ public class TestListHDFS {
         runner.clearTransferState();
 
         // add new file to pull
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 1999L, 0L, create777(), "owner", "group", new 
Path("/test/testFile2.txt")));
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 2000L, 0L, create777(), "owner", "group", new 
Path("/test/testFile2.txt")));
 
         // cause calls to service to fail
         service.failOnCalls = true;
@@ -172,23 +191,55 @@ public class TestListHDFS {
 
         service.failOnCalls = false;
         runner.getStateManager().setFailOnStateGet(Scope.CLUSTER, false);
+        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
+
+        runner.run();
 
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
+        Map<String, String> newState = 
runner.getStateManager().getState(Scope.CLUSTER).toMap();
+        assertEquals("2000", newState.get(ListHDFS.LISTING_TIMESTAMP_KEY));
+        assertEquals("1999", newState.get(ListHDFS.EMITTED_TIMESTAMP_KEY));
+
+        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
         runner.run();
+
+        newState = runner.getStateManager().getState(Scope.CLUSTER).toMap();
+        assertEquals("2000", newState.get(ListHDFS.LISTING_TIMESTAMP_KEY));
+        assertEquals("2000", newState.get(ListHDFS.EMITTED_TIMESTAMP_KEY));
+
         runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testOnlyNewestEntriesHeldBack() throws InterruptedException {
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testFile.txt")));
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 8L, 0L, create777(), "owner", "group", new 
Path("/test/testFile2.txt")));
+
+        // this is a directory, so it won't be counted toward the entries
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
true, 1, 1L, 8L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
+        proc.fileSystem.addFileStatus(new Path("/test/testDir"), new 
FileStatus(1L, false, 1, 1L, 100L, 0L, create777(), "owner", "group", new 
Path("/test/testDir/1.txt")));
+        proc.fileSystem.addFileStatus(new Path("/test/testDir"), new 
FileStatus(1L, false, 1, 1L, 100L, 0L, create777(), "owner", "group", new 
Path("/test/testDir/2.txt")));
+
+        // The first iteration should pick up 2 files with the smaller 
timestamps.
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
 
-        // ensure state saved
-        runner.getStateManager().assertStateSet(Scope.CLUSTER);
-        final Map<String, String> newState = 
runner.getStateManager().getState(Scope.CLUSTER).toMap();
-        assertEquals(3, newState.size());
+        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
+        runner.run();
+
+        // Next iteration should pick up the other 2 files, since nothing else 
was added.
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 4);
+
+        proc.fileSystem.addFileStatus(new Path("/test/testDir"), new 
FileStatus(1L, false, 1, 1L, 110L, 0L, create777(), "owner", "group", new 
Path("/test/testDir/3.txt")));
+        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 4);
 
-        final String path0 = newState.get("path.0");
-        final String path1 = newState.get("path.1");
-        assertTrue(path0.equals("/test/testFile.txt") || 
path0.equals("/test/testFile2.txt"));
-        assertTrue(path1.equals("/test/testFile.txt") || 
path1.equals("/test/testFile2.txt"));
-        assertNotSame(path0, path1);
+        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
+        runner.run();
 
-        final Long timestamp = Long.parseLong(newState.get("timestamp"));
-        assertEquals(1999L, timestamp.longValue());
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 5);
     }
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/56f79e1e/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
index a85ff2a..68ed22f 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
@@ -54,6 +54,30 @@ public class TestAbstractListProcessor {
     public final TemporaryFolder testFolder = new TemporaryFolder();
 
     @Test
+    public void testPreviouslySkippedEntriesEmittedOnNextIteration() throws 
Exception {
+        final ConcreteListProcessor proc = new ConcreteListProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.run();
+
+        final long initialTimestamp = System.currentTimeMillis();
+
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        proc.addEntity("name", "id", initialTimestamp);
+        proc.addEntity("name", "id2", initialTimestamp);
+        runner.run();
+
+        // First run, the above listed entries would be skipped to avoid write 
synchronization issues
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+
+        // Ensure we have covered the necessary lag period to avoid issues 
where the processor was immediately scheduled to run again
+        Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2);
+
+        // Run again without introducing any new entries
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
+    }
+
+    @Test
     public void testOnlyNewEntriesEmitted() throws Exception {
         final ConcreteListProcessor proc = new ConcreteListProcessor();
         final TestRunner runner = TestRunners.newTestRunner(proc);

Reply via email to