Repository: nifi Updated Branches: refs/heads/master ac9944cce -> c118e9623
NIFI-5000 - ListHDFS properly lists files from updated directory path Signed-off-by: Pierre Villard <[email protected]> This closes #2576. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c118e962 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c118e962 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c118e962 Branch: refs/heads/master Commit: c118e9623899a34613f0ac8a6f02eafe76df12ae Parents: ac9944c Author: zenfenan <[email protected]> Authored: Thu Mar 22 22:43:52 2018 +0530 Committer: Pierre Villard <[email protected]> Committed: Wed Apr 25 18:03:44 2018 +0200 ---------------------------------------------------------------------- .../apache/nifi/processors/hadoop/ListHDFS.java | 14 +++++++---- .../nifi/processors/hadoop/TestListHDFS.java | 25 ++++++++++++++++++++ 2 files changed, 34 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/c118e962/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java index d33fc2e..bd30ca1 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java @@ -143,7 +143,7 @@ public class ListHDFS extends AbstractHadoopProcessor { private volatile long latestTimestampListed = -1L; private volatile long latestTimestampEmitted = -1L; private volatile long lastRunTimestamp = -1L; - + private volatile boolean resetState = false; static final String LISTING_TIMESTAMP_KEY = "listing.timestamp"; static final String EMITTED_TIMESTAMP_KEY = "emitted.timestamp"; @@ -202,8 +202,7 @@ public class ListHDFS extends AbstractHadoopProcessor { public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { super.onPropertyModified(descriptor, oldValue, newValue); if (isConfigurationRestored() && (descriptor.equals(DIRECTORY) || descriptor.equals(FILE_FILTER))) { - latestTimestampEmitted = -1L; - latestTimestampListed = -1L; + this.resetState = true; } } @@ -283,8 +282,6 @@ public class ListHDFS extends AbstractHadoopProcessor { return toList; } - - @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { // We have to ensure that we don't continually perform listings, because if we perform two listings within @@ -302,6 +299,12 @@ public class ListHDFS extends AbstractHadoopProcessor { // Ensure that we are using the latest listing information before we try to perform a listing of HDFS files. try { + if (resetState) { + getLogger().debug("Property has been modified. Resetting the state values - listing.timestamp and emitted.timestamp to -1L"); + context.getStateManager().clear(Scope.CLUSTER); + this.resetState = false; + } + final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER); if (stateMap.getVersion() == -1L) { latestTimestampEmitted = -1L; @@ -464,4 +467,5 @@ public class ListHDFS extends AbstractHadoopProcessor { } }; } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/c118e962/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java index f176a5f..fab28f3 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java @@ -381,6 +381,31 @@ public class TestListHDFS { runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2); } + @Test + public void testListAfterDirectoryChange() throws InterruptedException { + proc.fileSystem.addFileStatus(new Path("/test1"), new FileStatus(1L, false, 1, 1L, 100L,0L, create777(), "owner", "group", new Path("/test1/testFile-1_1.txt"))); + proc.fileSystem.addFileStatus(new Path("/test2"), new FileStatus(1L, false, 1, 1L, 150L,0L, create777(), "owner", "group", new Path("/test2/testFile-2_1.txt"))); + proc.fileSystem.addFileStatus(new Path("/test1"), new FileStatus(1L, false, 1, 1L, 200L,0L, create777(), "owner", "group", new Path("/test1/testFile-1_2.txt"))); + + runner.setProperty(ListHDFS.DIRECTORY, "/test1"); + + runner.run(); // Initial run, latest file from /test1 will be ignored + + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS)); + runner.run(); // Latest file i.e. testFile-1_2.txt from /test1 should also be picked up now + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2); + + runner.setProperty(ListHDFS.DIRECTORY, "/test2"); // Changing directory should reset the state + + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS)); + runner.run(); // Will ignore the files for this cycle + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2); + + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS)); + runner.run(); // Since state has been reset, testFile-2_1.txt from /test2 should be picked up + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 3); + } + private FsPermission create777() { return new FsPermission((short) 0777);
