This is an automated email from the ASF dual-hosted git repository.

tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 5bd4bc5190 NIFI-11178: Improve ListHDFS performance, incremental 
loading refactor.
5bd4bc5190 is described below

commit 5bd4bc5190662f6443930fba4f01c0b3e813b263
Author: Lehel <[email protected]>
AuthorDate: Wed May 10 13:33:46 2023 +0200

    NIFI-11178: Improve ListHDFS performance, incremental loading refactor.
---
 .../nifi-hdfs-processors/pom.xml                   |   5 +
 .../apache/nifi/processors/hadoop/ListHDFS.java    | 695 ++++++---------------
 .../processors/hadoop/util/FileStatusIterable.java | 122 ++++
 .../processors/hadoop/util/FileStatusManager.java  |  55 ++
 .../nifi/processors/hadoop/util/FilterMode.java    |  84 +++
 .../hadoop/util/writer/FlowFileObjectWriter.java   |  75 +++
 .../hadoop/util/writer/HadoopFileStatusWriter.java | 113 ++++
 .../hadoop/util/writer/RecordObjectWriter.java     | 154 +++++
 .../nifi/processors/hadoop/TestListHDFS.java       | 581 +++++++----------
 .../hadoop/TestListHDFSPerformanceIT.java          | 155 +++++
 .../hadoop/util/TestFileStatusIterator.java        | 151 +++++
 11 files changed, 1313 insertions(+), 877 deletions(-)

diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
index b6625b8353..dfe57097b7 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
@@ -154,6 +154,11 @@
             <groupId>org.glassfish.jaxb</groupId>
             <artifactId>jaxb-runtime</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index 40abeb16f9..330506d14d 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -17,12 +17,9 @@
 package org.apache.nifi.processors.hadoop;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
@@ -36,50 +33,40 @@ import 
org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
-import org.apache.nifi.deprecation.log.DeprecationLogger;
-import org.apache.nifi.deprecation.log.DeprecationLoggerFactory;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.hadoop.util.FileStatusIterable;
+import org.apache.nifi.processors.hadoop.util.FileStatusManager;
+import org.apache.nifi.processors.hadoop.util.FilterMode;
+import org.apache.nifi.processors.hadoop.util.writer.FlowFileObjectWriter;
+import org.apache.nifi.processors.hadoop.util.writer.HadoopFileStatusWriter;
+import org.apache.nifi.processors.hadoop.util.writer.RecordObjectWriter;
 import org.apache.nifi.scheduling.SchedulingStrategy;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
-import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.apache.nifi.serialization.WriteResult;
-import org.apache.nifi.serialization.record.MapRecord;
-import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
-
-import java.io.File;
+
 import java.io.IOException;
-import java.io.OutputStream;
-import java.security.PrivilegedExceptionAction;
-import java.sql.Timestamp;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.nifi.processors.hadoop.util.FilterMode.FILTER_DIRECTORIES_AND_FILES;
 
 @PrimaryNodeOnly
 @TriggerSerially
@@ -87,189 +74,116 @@ import java.util.regex.Pattern;
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
 @Tags({"hadoop", "HCFS", "HDFS", "get", "list", "ingest", "source", 
"filesystem"})
 @SeeAlso({GetHDFS.class, FetchHDFS.class, PutHDFS.class})
-@CapabilityDescription("Retrieves a listing of files from HDFS. Each time a 
listing is performed, the files with the latest timestamp will be excluded "
-        + "and picked up during the next execution of the processor. This is 
done to ensure that we do not miss any files, or produce duplicates, in the "
-        + "cases where files with the same timestamp are written immediately 
before and after a single execution of the processor. For each file that is "
-        + "listed in HDFS, this processor creates a FlowFile that represents 
the HDFS file to be fetched in conjunction with FetchHDFS. This Processor is "
-        +  "designed to run on Primary Node only in a cluster. If the primary 
node changes, the new Primary Node will pick up where the previous node left "
-        +  "off without duplicating all of the data. Unlike GetHDFS, this 
Processor does not delete any data from HDFS.")
+@CapabilityDescription("Retrieves a listing of files from HDFS. For each file 
that is listed in HDFS, this processor creates a FlowFile that represents "
+        + "the HDFS file to be fetched in conjunction with FetchHDFS. This 
Processor is designed to run on Primary Node only in a cluster. If the primary "
+        + "node changes, the new Primary Node will pick up where the previous 
node left off without duplicating all of the data. Unlike GetHDFS, this "
+        + "Processor does not delete any data from HDFS.")
 @WritesAttributes({
-    @WritesAttribute(attribute="filename", description="The name of the file 
that was read from HDFS."),
-    @WritesAttribute(attribute="path", description="The path is set to the 
absolute path of the file's directory on HDFS. For example, if the Directory 
property is set to /tmp, "
-            + "then files picked up from /tmp will have the path attribute set 
to \"./\". If the Recurse Subdirectories property is set to true and a file is 
picked up "
-            + "from /tmp/abc/1/2/3, then the path attribute will be set to 
\"/tmp/abc/1/2/3\"."),
-    @WritesAttribute(attribute="hdfs.owner", description="The user that owns 
the file in HDFS"),
-    @WritesAttribute(attribute="hdfs.group", description="The group that owns 
the file in HDFS"),
-    @WritesAttribute(attribute="hdfs.lastModified", description="The timestamp 
of when the file in HDFS was last modified, as milliseconds since midnight Jan 
1, 1970 UTC"),
-    @WritesAttribute(attribute="hdfs.length", description="The number of bytes 
in the file in HDFS"),
-    @WritesAttribute(attribute="hdfs.replication", description="The number of 
HDFS replicas for hte file"),
-    @WritesAttribute(attribute="hdfs.permissions", description="The 
permissions for the file in HDFS. This is formatted as 3 characters for the 
owner, "
-            + "3 for the group, and 3 for other users. For example rw-rw-r--")
+        @WritesAttribute(attribute = "filename", description = "The name of 
the file that was read from HDFS."),
+        @WritesAttribute(attribute = "path", description = "The path is set to 
the absolute path of the file's directory on HDFS. For example, if the 
Directory property is set to /tmp, "
+                + "then files picked up from /tmp will have the path attribute 
set to \"./\". If the Recurse Subdirectories property is set to true and a file 
is picked up "
+                + "from /tmp/abc/1/2/3, then the path attribute will be set to 
\"/tmp/abc/1/2/3\"."),
+        @WritesAttribute(attribute = "hdfs.owner", description = "The user 
that owns the file in HDFS"),
+        @WritesAttribute(attribute = "hdfs.group", description = "The group 
that owns the file in HDFS"),
+        @WritesAttribute(attribute = "hdfs.lastModified", description = "The 
timestamp of when the file in HDFS was last modified, as milliseconds since 
midnight Jan 1, 1970 UTC"),
+        @WritesAttribute(attribute = "hdfs.length", description = "The number 
of bytes in the file in HDFS"),
+        @WritesAttribute(attribute = "hdfs.replication", description = "The 
number of HDFS replicas for hte file"),
+        @WritesAttribute(attribute = "hdfs.permissions", description = "The 
permissions for the file in HDFS. This is formatted as 3 characters for the 
owner, "
+                + "3 for the group, and 3 for other users. For example 
rw-rw-r--")
 })
-@Stateful(scopes = Scope.CLUSTER, description = "After performing a listing of 
HDFS files, the latest timestamp of all the files listed and the latest "
-        + "timestamp of all the files transferred are both stored. This allows 
the Processor to list only files that have been added or modified after "
-        + "this date the next time that the Processor is run, without having 
to store all of the actual filenames/paths which could lead to performance "
-        + "problems. State is stored across the cluster so that this Processor 
can be run on Primary Node only and if a new Primary "
-        + "Node is selected, the new node can pick up where the previous node 
left off, without duplicating the data.")
+@Stateful(scopes = Scope.CLUSTER, description = "After performing a listing of 
HDFS files, the latest timestamp of all the files listed is stored. "
+        + "This allows the Processor to list only files that have been added 
or modified after this date the next time that the Processor is run, "
+        + "without having to store all of the actual filenames/paths which 
could lead to performance problems. State is stored across the cluster "
+        + "so that this Processor can be run on Primary Node only and if a new 
Primary Node is selected, the new node can pick up where the previous "
+        + "node left off, without duplicating the data.")
 @DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
 public class ListHDFS extends AbstractHadoopProcessor {
 
-    private static final RecordSchema RECORD_SCHEMA;
-    private static final String FILENAME = "filename";
-    private static final String PATH = "path";
-    private static final String IS_DIRECTORY = "directory";
-    private static final String SIZE = "size";
-    private static final String LAST_MODIFIED = "lastModified";
-    private static final String PERMISSIONS = "permissions";
-    private static final String OWNER = "owner";
-    private static final String GROUP = "group";
-    private static final String REPLICATION = "replication";
-    private static final String IS_SYM_LINK = "symLink";
-    private static final String IS_ENCRYPTED = "encrypted";
-    private static final String IS_ERASURE_CODED = "erasureCoded";
-
-    static {
-        final List<RecordField> recordFields = new ArrayList<>();
-        recordFields.add(new RecordField(FILENAME, 
RecordFieldType.STRING.getDataType(), false));
-        recordFields.add(new RecordField(PATH, 
RecordFieldType.STRING.getDataType(), false));
-        recordFields.add(new RecordField(IS_DIRECTORY, 
RecordFieldType.BOOLEAN.getDataType(), false));
-        recordFields.add(new RecordField(SIZE, 
RecordFieldType.LONG.getDataType(), false));
-        recordFields.add(new RecordField(LAST_MODIFIED, 
RecordFieldType.TIMESTAMP.getDataType(), false));
-        recordFields.add(new RecordField(PERMISSIONS, 
RecordFieldType.STRING.getDataType()));
-        recordFields.add(new RecordField(OWNER, 
RecordFieldType.STRING.getDataType()));
-        recordFields.add(new RecordField(GROUP, 
RecordFieldType.STRING.getDataType()));
-        recordFields.add(new RecordField(REPLICATION, 
RecordFieldType.INT.getDataType()));
-        recordFields.add(new RecordField(IS_SYM_LINK, 
RecordFieldType.BOOLEAN.getDataType()));
-        recordFields.add(new RecordField(IS_ENCRYPTED, 
RecordFieldType.BOOLEAN.getDataType()));
-        recordFields.add(new RecordField(IS_ERASURE_CODED, 
RecordFieldType.BOOLEAN.getDataType()));
-        RECORD_SCHEMA = new SimpleRecordSchema(recordFields);
-    }
+    private static final String NON_HIDDEN_FILES_REGEX = "[^\\.].*";
 
     public static final PropertyDescriptor RECURSE_SUBDIRS = new 
PropertyDescriptor.Builder()
-        .name("Recurse Subdirectories")
-        .description("Indicates whether to list files from subdirectories of 
the HDFS directory")
-        .required(true)
-        .allowableValues("true", "false")
-        .defaultValue("true")
-        .build();
+            .name("Recurse Subdirectories")
+            .description("Indicates whether to list files from subdirectories 
of the HDFS directory")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .build();
 
     public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
-        .name("record-writer")
-        .displayName("Record Writer")
-        .description("Specifies the Record Writer to use for creating the 
listing. If not specified, one FlowFile will be created for each entity that is 
listed. If the Record Writer is specified, " +
-            "all entities will be written to a single FlowFile.")
-        .required(false)
-        .identifiesControllerService(RecordSetWriterFactory.class)
-        .build();
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the Record Writer to use for creating the 
listing. If not specified, one FlowFile will be created for each "
+                    + "entity that is listed. If the Record Writer is 
specified, all entities will be written to a single FlowFile.")
+            .required(false)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
 
     public static final PropertyDescriptor FILE_FILTER = new 
PropertyDescriptor.Builder()
-        .name("File Filter")
-        .description("Only files whose names match the given regular 
expression will be picked up")
-        .required(true)
-        .defaultValue("[^\\.].*")
-        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
-        .build();
-
-    private static final String FILTER_MODE_DIRECTORIES_AND_FILES = 
"filter-mode-directories-and-files";
-    private static final String FILTER_MODE_FILES_ONLY = 
"filter-mode-files-only";
-    private static final String FILTER_MODE_FULL_PATH = 
"filter-mode-full-path";
-    static final AllowableValue FILTER_DIRECTORIES_AND_FILES_VALUE = new 
AllowableValue(FILTER_MODE_DIRECTORIES_AND_FILES,
-        "Directories and Files",
-        "Filtering will be applied to the names of directories and files.  If 
" + RECURSE_SUBDIRS.getDisplayName()
-                + " is set to true, only subdirectories with a matching name 
will be searched for files that match "
-                + "the regular expression defined in " + 
FILE_FILTER.getDisplayName() + ".");
-    static final AllowableValue FILTER_FILES_ONLY_VALUE = new 
AllowableValue(FILTER_MODE_FILES_ONLY,
-        "Files Only",
-        "Filtering will only be applied to the names of files.  If " + 
RECURSE_SUBDIRS.getDisplayName()
-                + " is set to true, the entire subdirectory tree will be 
searched for files that match "
-                + "the regular expression defined in " + 
FILE_FILTER.getDisplayName() + ".");
-    static final AllowableValue FILTER_FULL_PATH_VALUE = new 
AllowableValue(FILTER_MODE_FULL_PATH,
-        "Full Path",
-        "Filtering will be applied by evaluating the regular expression 
defined in " + FILE_FILTER.getDisplayName()
-                + " against the full path of files with and without the scheme 
and authority.  If "
-                + RECURSE_SUBDIRS.getDisplayName() + " is set to true, the 
entire subdirectory tree will be searched for files in which the full path of "
-                + "the file matches the regular expression defined in " + 
FILE_FILTER.getDisplayName() + ".  See 'Additional Details' for more 
information.");
+            .name("File Filter")
+            .description("Only files whose names match the given regular 
expression will be picked up")
+            .required(true)
+            .defaultValue(NON_HIDDEN_FILES_REGEX)
+            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+            .build();
 
     public static final PropertyDescriptor FILE_FILTER_MODE = new 
PropertyDescriptor.Builder()
-        .name("file-filter-mode")
-        .displayName("File Filter Mode")
-        .description("Determines how the regular expression in  " + 
FILE_FILTER.getDisplayName() + " will be used when retrieving listings.")
-        .required(true)
-        .allowableValues(FILTER_DIRECTORIES_AND_FILES_VALUE, 
FILTER_FILES_ONLY_VALUE, FILTER_FULL_PATH_VALUE)
-        .defaultValue(FILTER_DIRECTORIES_AND_FILES_VALUE.getValue())
-        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
-        .build();
-
-    public static final PropertyDescriptor MIN_AGE = new 
PropertyDescriptor.Builder()
-        .name("minimum-file-age")
-        .displayName("Minimum File Age")
-        .description("The minimum age that a file must be in order to be 
pulled; any file younger than this "
-                + "amount of time (based on last modification date) will be 
ignored")
-        .required(false)
-        .addValidator(StandardValidators.createTimePeriodValidator(0, 
TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
-        .build();
-
-    public static final PropertyDescriptor MAX_AGE = new 
PropertyDescriptor.Builder()
-        .name("maximum-file-age")
-        .displayName("Maximum File Age")
-        .description("The maximum age that a file must be in order to be 
pulled; any file older than this "
-                + "amount of time (based on last modification date) will be 
ignored. Minimum value is 100ms.")
-        .required(false)
-        .addValidator(StandardValidators.createTimePeriodValidator(100, 
TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
-        .build();
+            .name("file-filter-mode")
+            .displayName("File Filter Mode")
+            .description("Determines how the regular expression in  " + 
FILE_FILTER.getDisplayName() + " will be used when retrieving listings.")
+            .required(true)
+            .allowableValues(FilterMode.class)
+            .defaultValue(FILTER_DIRECTORIES_AND_FILES.getValue())
+            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor MINIMUM_FILE_AGE = new 
PropertyDescriptor.Builder()
+            .name("minimum-file-age")
+            .displayName("Minimum File Age")
+            .description("The minimum age that a file must be in order to be 
pulled; any file younger than this "
+                    + "amount of time (based on last modification date) will 
be ignored")
+            .required(false)
+            .addValidator(StandardValidators.createTimePeriodValidator(0, 
TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
+            .build();
+
+    public static final PropertyDescriptor MAXIMUM_FILE_AGE = new 
PropertyDescriptor.Builder()
+            .name("maximum-file-age")
+            .displayName("Maximum File Age")
+            .description("The maximum age that a file must be in order to be 
pulled; any file older than this "
+                    + "amount of time (based on last modification date) will 
be ignored. Minimum value is 100ms.")
+            .required(false)
+            .addValidator(StandardValidators.createTimePeriodValidator(100, 
TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
+            .build();
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
-        .name("success")
-        .description("All FlowFiles are transferred to this relationship")
-        .build();
-
-    private static final DeprecationLogger deprecationLogger = 
DeprecationLoggerFactory.getLogger(ListHDFS.class);
-
-    private volatile long latestTimestampListed = -1L;
-    private volatile long latestTimestampEmitted = -1L;
-    private volatile long lastRunTimestamp = -1L;
-    private volatile boolean resetState = false;
-    static final String LISTING_TIMESTAMP_KEY = "listing.timestamp";
-    static final String EMITTED_TIMESTAMP_KEY = "emitted.timestamp";
-
-    static final long LISTING_LAG_NANOS = TimeUnit.MILLISECONDS.toNanos(100L);
+            .name("success")
+            .description("All FlowFiles are transferred to this relationship")
+            .build();
+    public static final String LEGACY_EMITTED_TIMESTAMP_KEY = 
"emitted.timestamp";
+    public static final String LEGACY_LISTING_TIMESTAMP_KEY = 
"listing.timestamp";
+    public static final String LATEST_TIMESTAMP_KEY = "latest.timestamp";
+    public static final String LATEST_FILES_KEY = "latest.file.%d";
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.singleton(REL_SUCCESS);
     private Pattern fileFilterRegexPattern;
-
-    @Override
-    protected void init(final ProcessorInitializationContext context) {
-        super.init(context);
-    }
+    private volatile boolean resetState = false;
 
     @Override
     protected void preProcessConfiguration(Configuration config, 
ProcessContext context) {
         super.preProcessConfiguration(config, context);
         // Since this processor is marked as INPUT_FORBIDDEN, the FILE_FILTER 
regex can be compiled here rather than during onTrigger processing
         fileFilterRegexPattern = 
Pattern.compile(context.getProperty(FILE_FILTER).getValue());
-
-    }
-
-    protected File getPersistenceFile() {
-        return new File("conf/state/" + getIdentifier());
     }
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> props = new ArrayList<>(properties);
-        props.add(DIRECTORY);
-        props.add(RECURSE_SUBDIRS);
-        props.add(RECORD_WRITER);
-        props.add(FILE_FILTER);
-        props.add(FILE_FILTER_MODE);
-        props.add(MIN_AGE);
-        props.add(MAX_AGE);
+        props.addAll(Arrays.asList(DIRECTORY, RECURSE_SUBDIRS, RECORD_WRITER, 
FILE_FILTER, FILE_FILTER_MODE, MINIMUM_FILE_AGE, MAXIMUM_FILE_AGE));
         return props;
     }
 
     @Override
     public Set<Relationship> getRelationships() {
-        final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_SUCCESS);
-        return relationships;
+        return RELATIONSHIPS;
     }
 
     @Override
@@ -277,401 +191,148 @@ public class ListHDFS extends AbstractHadoopProcessor {
 
         final List<ValidationResult> problems = new 
ArrayList<>(super.customValidate(context));
 
-        final Long minAgeProp = 
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        final Long maxAgeProp = 
context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+        final Long minAgeProp = 
context.getProperty(MINIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+        final Long maxAgeProp = 
context.getProperty(MAXIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
         final long minimumAge = (minAgeProp == null) ? 0L : minAgeProp;
         final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : 
maxAgeProp;
 
         if (minimumAge > maximumAge) {
-            problems.add(new 
ValidationResult.Builder().valid(false).subject("GetHDFS Configuration")
-                    .explanation(MIN_AGE.getDisplayName() + " cannot be 
greater than " + MAX_AGE.getDisplayName()).build());
+            problems.add(new 
ValidationResult.Builder().valid(false).subject("ListHDFS Configuration")
+                    .explanation(MINIMUM_FILE_AGE.getDisplayName() + " cannot 
be greater than " + MAXIMUM_FILE_AGE.getDisplayName()).build());
         }
-
         return problems;
     }
 
-    protected String getKey(final String directory) {
-        return getIdentifier() + ".lastListingTime." + directory;
-    }
-
     @Override
     public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
         super.onPropertyModified(descriptor, oldValue, newValue);
         if (isConfigurationRestored() && (descriptor.equals(DIRECTORY) || 
descriptor.equals(FILE_FILTER))) {
-            this.resetState = true;
-        }
-    }
-
-    /**
-     * Determines which of the given FileStatus's describes a File that should 
be listed.
-     *
-     * @param statuses the eligible FileStatus objects that we could 
potentially list
-     * @param context processor context with properties values
-     * @return a Set containing only those FileStatus objects that we want to 
list
-     */
-    Set<FileStatus> determineListable(final Set<FileStatus> statuses, 
ProcessContext context) {
-        final long minTimestamp = this.latestTimestampListed;
-        final TreeMap<Long, List<FileStatus>> orderedEntries = new TreeMap<>();
-
-        final Long minAgeProp = 
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        // NIFI-4144 - setting to MIN_VALUE so that in case the file 
modification time is in
-        // the future relative to the nifi instance, files are not skipped.
-        final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : 
minAgeProp;
-        final Long maxAgeProp = 
context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : 
maxAgeProp;
-
-        // Build a sorted map to determine the latest possible entries
-        for (final FileStatus status : statuses) {
-            if (status.getPath().getName().endsWith("_COPYING_")) {
-                continue;
-            }
-
-            final long fileAge = System.currentTimeMillis() - 
status.getModificationTime();
-            if (minimumAge > fileAge || fileAge > maximumAge) {
-                continue;
-            }
-
-            final long entityTimestamp = status.getModificationTime();
-
-            if (entityTimestamp > latestTimestampListed) {
-                latestTimestampListed = entityTimestamp;
-            }
-
-            // New entries are all those that occur at or after the associated 
timestamp
-            final boolean newEntry = entityTimestamp >= minTimestamp && 
entityTimestamp > latestTimestampEmitted;
-
-            if (newEntry) {
-                List<FileStatus> entitiesForTimestamp = 
orderedEntries.get(status.getModificationTime());
-                if (entitiesForTimestamp == null) {
-                    entitiesForTimestamp = new ArrayList<FileStatus>();
-                    orderedEntries.put(status.getModificationTime(), 
entitiesForTimestamp);
-                }
-                entitiesForTimestamp.add(status);
-            }
+            resetState = true;
         }
-
-        final Set<FileStatus> toList = new HashSet<>();
-
-        if (orderedEntries.size() > 0) {
-            long latestListingTimestamp = orderedEntries.lastKey();
-
-            // If the last listing time is equal to the newest entries 
previously seen,
-            // another iteration has occurred without new files and special 
handling is needed to avoid starvation
-            if (latestListingTimestamp == minTimestamp) {
-                // We are done if the latest listing timestamp is equal to the 
last processed time,
-                // meaning we handled those items originally passed over
-                if (latestListingTimestamp == latestTimestampEmitted) {
-                    return Collections.emptySet();
-                }
-            } else {
-                // Otherwise, newest entries are held back one cycle to avoid 
issues in writes occurring exactly when the listing is being performed to avoid 
missing data
-                orderedEntries.remove(latestListingTimestamp);
-            }
-
-            for (List<FileStatus> timestampEntities : orderedEntries.values()) 
{
-                for (FileStatus status : timestampEntities) {
-                    toList.add(status);
-                }
-            }
-        }
-
-        return toList;
     }
 
     @OnScheduled
     public void resetStateIfNecessary(final ProcessContext context) throws 
IOException {
         if (resetState) {
-            getLogger().debug("Property has been modified. Resetting the state 
values - listing.timestamp and emitted.timestamp to -1L");
+            getLogger().debug("Property has been modified. Resetting the state 
values.");
             context.getStateManager().clear(Scope.CLUSTER);
-            this.resetState = false;
+            resetState = false;
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        // We have to ensure that we don't continually perform listings, 
because if we perform two listings within
-        // the same millisecond, our algorithm for comparing timestamps will 
not work. So we ensure here that we do
-        // not let that happen.
-        final long now = System.nanoTime();
-        if (now - lastRunTimestamp < LISTING_LAG_NANOS) {
-            lastRunTimestamp = now;
-            context.yield();
-            return;
-        }
-        lastRunTimestamp = now;
-
         // Ensure that we are using the latest listing information before we 
try to perform a listing of HDFS files.
+        final long latestTimestamp;
+        final List<String> latestFiles;
         try {
             final StateMap stateMap = session.getState(Scope.CLUSTER);
-            if (!stateMap.getStateVersion().isPresent()) {
-                latestTimestampEmitted = -1L;
-                latestTimestampListed = -1L;
-                getLogger().debug("Found no state stored");
+            final String latestTimestampString = 
stateMap.get(LATEST_TIMESTAMP_KEY);
+
+            final String legacyLatestListingTimestampString = 
stateMap.get(LEGACY_LISTING_TIMESTAMP_KEY);
+            final String legacyLatestEmittedTimestampString = 
stateMap.get(LEGACY_EMITTED_TIMESTAMP_KEY);
+
+            if (legacyLatestListingTimestampString != null) {
+                final long legacyLatestListingTimestamp = 
Long.parseLong(legacyLatestListingTimestampString);
+                final long legacyLatestEmittedTimestamp = 
Long.parseLong(legacyLatestEmittedTimestampString);
+                latestTimestamp = legacyLatestListingTimestamp == 
legacyLatestEmittedTimestamp ? legacyLatestListingTimestamp + 1 : 
legacyLatestListingTimestamp;
+                latestFiles = new ArrayList<>();
+                getLogger().debug("Transitioned from legacy state to new 
state. 'legacyLatestListingTimestamp': {}, 'legacyLatestEmittedTimeStamp': 
{}'," +
+                        "'latestTimestamp': {}", legacyLatestListingTimestamp, 
legacyLatestEmittedTimestamp, latestTimestamp);
+            } else if (latestTimestampString != null) {
+                latestTimestamp = Long.parseLong(latestTimestampString);
+                latestFiles = stateMap.toMap().entrySet().stream()
+                        .filter(entry -> 
entry.getKey().startsWith("latest.file"))
+                        .map(Map.Entry::getValue)
+                        .collect(Collectors.toList());
             } else {
-                // Determine if state is stored in the 'new' format or the 
'old' format
-                final String emittedString = 
stateMap.get(EMITTED_TIMESTAMP_KEY);
-                if (emittedString == null) {
-                    latestTimestampEmitted = -1L;
-                    latestTimestampListed = -1L;
-                    getLogger().debug("Found no recognized state keys; 
assuming no relevant state and resetting listing/emitted time to -1");
-                } else {
-                    // state is stored in the new format, using just two 
timestamps
-                    latestTimestampEmitted = Long.parseLong(emittedString);
-                    final String listingTimestmapString = 
stateMap.get(LISTING_TIMESTAMP_KEY);
-                    if (listingTimestmapString != null) {
-                        latestTimestampListed = 
Long.parseLong(listingTimestmapString);
-                    }
-
-                    getLogger().debug("Found new-style state stored, latesting 
timestamp emitted = {}, latest listed = {}",
-                        new Object[] {latestTimestampEmitted, 
latestTimestampListed});
-                }
+                latestTimestamp = 0L;
+                latestFiles = new ArrayList<>();
             }
-        } catch (final IOException ioe) {
+        } catch (IOException e) {
             getLogger().error("Failed to retrieve timestamp of last listing 
from the State Manager. Will not perform listing until this is accomplished.");
             context.yield();
             return;
         }
 
         // Pull in any file that is newer than the timestamp that we have.
-        final FileSystem hdfs = getFileSystem();
-        final boolean recursive = 
context.getProperty(RECURSE_SUBDIRS).asBoolean();
-        String fileFilterMode = 
context.getProperty(FILE_FILTER_MODE).getValue();
+        try (final FileSystem hdfs = getFileSystem()) {
+            final boolean recursive = 
context.getProperty(RECURSE_SUBDIRS).asBoolean();
+            final PathFilter pathFilter = createPathFilter(context);
+            final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-        final Set<FileStatus> statuses;
-        try {
+            final FileStatusManager fileStatusManager = new 
FileStatusManager(latestTimestamp, latestFiles);
             final Path rootPath = getNormalizedPath(context, DIRECTORY);
-            statuses = getStatuses(rootPath, recursive, hdfs, 
createPathFilter(context), fileFilterMode);
-            getLogger().debug("Found a total of {} files in HDFS", new 
Object[] {statuses.size()});
-        } catch (final IOException | IllegalArgumentException e) {
-            getLogger().error("Failed to perform listing of HDFS", e);
-            return;
-        } catch (final InterruptedException e) {
-            Thread.currentThread().interrupt();
-            getLogger().error("Interrupted while performing listing of HDFS", 
e);
-            return;
-        }
+            final FileStatusIterable fileStatuses = new 
FileStatusIterable(rootPath, recursive, hdfs, getUserGroupInformation());
 
-        final Set<FileStatus> listable = determineListable(statuses, context);
-        getLogger().debug("Of the {} files found in HDFS, {} are listable", 
new Object[] {statuses.size(), listable.size()});
-
-        // Create FlowFile(s) for the listing, if there are any
-        if (!listable.isEmpty()) {
-            if (context.getProperty(RECORD_WRITER).isSet()) {
-                try {
-                    createRecords(listable, context, session);
-                } catch (final IOException | SchemaNotFoundException e) {
-                    getLogger().error("Failed to write listing of HDFS", e);
-                    return;
-                }
-            } else {
-                createFlowFiles(listable, session);
-            }
-        }
-
-        for (final FileStatus status : listable) {
-            final long fileModTime = status.getModificationTime();
-            if (fileModTime > latestTimestampEmitted) {
-                latestTimestampEmitted = fileModTime;
-            }
-        }
-
-        final Map<String, String> updatedState = new HashMap<>(1);
-        updatedState.put(LISTING_TIMESTAMP_KEY, 
String.valueOf(latestTimestampListed));
-        updatedState.put(EMITTED_TIMESTAMP_KEY, 
String.valueOf(latestTimestampEmitted));
-        getLogger().debug("New state map: {}", new Object[] {updatedState});
-
-        try {
-            session.setState(updatedState, Scope.CLUSTER);
-        } catch (final IOException ioe) {
-            getLogger().warn("Failed to save cluster-wide state. If NiFi is 
restarted, data duplication may occur", ioe);
-        }
-
-        final int listCount = listable.size();
-        if ( listCount > 0 ) {
-            getLogger().info("Successfully created listing with {} new files 
from HDFS", new Object[] {listCount});
-            session.commitAsync();
-        } else {
-            getLogger().debug("There is no data to list. Yielding.");
-            context.yield();
-        }
-    }
+            final Long minAgeProp = 
context.getProperty(MINIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+            final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : 
minAgeProp;
+            final Long maxAgeProp = 
context.getProperty(MAXIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+            final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : 
maxAgeProp;
 
-    private void createFlowFiles(final Set<FileStatus> fileStatuses, final 
ProcessSession session) {
-        for (final FileStatus status : fileStatuses) {
-            final Map<String, String> attributes = createAttributes(status);
-            FlowFile flowFile = session.create();
-            flowFile = session.putAllAttributes(flowFile, attributes);
-            session.transfer(flowFile, getSuccessRelationship());
-        }
-    }
-
-    private void createRecords(final Set<FileStatus> fileStatuses, final 
ProcessContext context, final ProcessSession session) throws IOException, 
SchemaNotFoundException {
-        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-
-        FlowFile flowFile = session.create();
-        final WriteResult writeResult;
-        try (final OutputStream out = session.write(flowFile);
-             final RecordSetWriter recordSetWriter = 
writerFactory.createWriter(getLogger(), getRecordSchema(), out, 
Collections.emptyMap())) {
-
-            recordSetWriter.beginRecordSet();
-            for (final FileStatus fileStatus : fileStatuses) {
-                final Record record = createRecord(fileStatus);
-                recordSetWriter.write(record);
+            final HadoopFileStatusWriter writer;
+            if (writerFactory == null) {
+                writer = new FlowFileObjectWriter(session, fileStatuses, 
minimumAge, maximumAge, pathFilter, fileStatusManager, latestTimestamp, 
latestFiles);
+            } else {
+                writer = new RecordObjectWriter(session, fileStatuses, 
minimumAge, maximumAge, pathFilter, fileStatusManager, latestTimestamp,
+                        latestFiles, writerFactory, getLogger());
             }
 
-            writeResult = recordSetWriter.finishRecordSet();
-        }
+            writer.write();
 
-        final Map<String, String> attributes = new 
HashMap<>(writeResult.getAttributes());
-        attributes.put("record.count", 
String.valueOf(writeResult.getRecordCount()));
-        flowFile = session.putAllAttributes(flowFile, attributes);
-
-        session.transfer(flowFile, getSuccessRelationship());
-    }
-
-    private Record createRecord(final FileStatus fileStatus) {
-        final Map<String, Object> values = new HashMap<>();
-        values.put(FILENAME, fileStatus.getPath().getName());
-        values.put(PATH, getAbsolutePath(fileStatus.getPath().getParent()));
-        values.put(OWNER, fileStatus.getOwner());
-        values.put(GROUP, fileStatus.getGroup());
-        values.put(LAST_MODIFIED, new 
Timestamp(fileStatus.getModificationTime()));
-        values.put(SIZE, fileStatus.getLen());
-        values.put(REPLICATION, fileStatus.getReplication());
-
-        final FsPermission permission = fileStatus.getPermission();
-        final String perms = getPerms(permission.getUserAction()) + 
getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction());
-        values.put(PERMISSIONS, perms);
-
-        values.put(IS_DIRECTORY, fileStatus.isDirectory());
-        values.put(IS_SYM_LINK, fileStatus.isSymlink());
-        values.put(IS_ENCRYPTED, fileStatus.isEncrypted());
-        values.put(IS_ERASURE_CODED, fileStatus.isErasureCoded());
-
-        return new MapRecord(getRecordSchema(), values);
-    }
-
-    private RecordSchema getRecordSchema() {
-        return RECORD_SCHEMA;
-    }
+            getLogger().debug("Found a total of {} files in HDFS, {} are 
listed", fileStatuses.getTotalFileCount(), writer.getListedFileCount());
 
-    private Set<FileStatus> getStatuses(final Path path, final boolean 
recursive, final FileSystem hdfs, final PathFilter filter, String filterMode) 
throws IOException, InterruptedException {
-        final Set<FileStatus> statusSet = new HashSet<>();
-
-        getLogger().debug("Fetching listing for {}", new Object[] {path});
-        final FileStatus[] statuses;
-        if (isPostListingFilterNeeded(filterMode)) {
-            // For this filter mode, the filter is not passed to listStatus, 
so that directory names will not be
-            // filtered out when the listing is recursive.
-            statuses = 
getUserGroupInformation().doAs((PrivilegedExceptionAction<FileStatus[]>) () -> 
hdfs.listStatus(path));
-        } else {
-            statuses = 
getUserGroupInformation().doAs((PrivilegedExceptionAction<FileStatus[]>) () -> 
hdfs.listStatus(path, filter));
-        }
-
-        for ( final FileStatus status : statuses ) {
-            if ( status.isDirectory() ) {
-                if ( recursive ) {
-                    try {
-                        statusSet.addAll(getStatuses(status.getPath(), 
recursive, hdfs, filter, filterMode));
-                    } catch (final IOException ioe) {
-                        getLogger().error("Failed to retrieve HDFS listing for 
subdirectory {} due to {}; will continue listing others", new Object[] 
{status.getPath(), ioe});
-                    }
+            if (writer.getListedFileCount() > 0) {
+                final Map<String, String> updatedState = new HashMap<>();
+                updatedState.put(LATEST_TIMESTAMP_KEY, 
String.valueOf(fileStatusManager.getCurrentLatestTimestamp()));
+                final List<String> files = 
fileStatusManager.getCurrentLatestFiles();
+                for (int i = 0; i < files.size(); i++) {
+                    final String currentFilePath = files.get(i);
+                    updatedState.put(String.format(LATEST_FILES_KEY, i), 
currentFilePath);
                 }
+                getLogger().debug("New state map: {}", updatedState);
+                updateState(session, updatedState);
+
+                getLogger().info("Successfully created listing with {} new 
files from HDFS", writer.getListedFileCount());
             } else {
-                if (isPostListingFilterNeeded(filterMode)) {
-                    // Filtering explicitly performed here, since it was not 
able to be done when calling listStatus.
-                    if (filter.accept(status.getPath())) {
-                        statusSet.add(status);
-                    }
-                } else {
-                    statusSet.add(status);
-                }
+                getLogger().debug("There is no data to list. Yielding.");
+                context.yield();
             }
+        } catch (IOException e) {
+            throw new ProcessException("IO error occurred when closing HDFS 
file system", e);
         }
-
-        return statusSet;
-    }
-
-    /**
-     * Determines if filtering needs to be applied, after calling {@link 
FileSystem#listStatus(Path)}, based on the
-     * given filter mode.
-     * Filter modes that need to be able to search directories regardless of 
the given filter should return true.
-     * FILTER_MODE_FILES_ONLY and FILTER_MODE_FULL_PATH require that {@link 
FileSystem#listStatus(Path)} be invoked
-     * without a filter so that all directories can be traversed when 
filtering with these modes.
-     * FILTER_MODE_DIRECTORIES_AND_FILES should return false, since filtering 
can be applied directly with
-     * {@link FileSystem#listStatus(Path, PathFilter)} regardless of a 
recursive listing.
-     * @param filterMode the value of one of the defined AllowableValues 
representing filter modes
-     * @return true if results need to be filtered, false otherwise
-     */
-    private boolean isPostListingFilterNeeded(String filterMode) {
-        return filterMode.equals(FILTER_MODE_FILES_ONLY) || 
filterMode.equals(FILTER_MODE_FULL_PATH);
-    }
-
-    private String getAbsolutePath(final Path path) {
-        final Path parent = path.getParent();
-        final String prefix = (parent == null || parent.getName().equals("")) 
? "" : getAbsolutePath(parent);
-        return prefix + "/" + path.getName();
-    }
-
-    private Map<String, String> createAttributes(final FileStatus status) {
-        final Map<String, String> attributes = new HashMap<>();
-        attributes.put(CoreAttributes.FILENAME.key(), 
status.getPath().getName());
-        attributes.put(CoreAttributes.PATH.key(), 
getAbsolutePath(status.getPath().getParent()));
-
-        attributes.put(getAttributePrefix() + ".owner", status.getOwner());
-        attributes.put(getAttributePrefix() + ".group", status.getGroup());
-        attributes.put(getAttributePrefix() + ".lastModified", 
String.valueOf(status.getModificationTime()));
-        attributes.put(getAttributePrefix() + ".length", 
String.valueOf(status.getLen()));
-        attributes.put(getAttributePrefix() + ".replication", 
String.valueOf(status.getReplication()));
-
-        final FsPermission permission = status.getPermission();
-        final String perms = getPerms(permission.getUserAction()) + 
getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction());
-        attributes.put(getAttributePrefix() + ".permissions", perms);
-        return attributes;
-    }
-
-    private String getPerms(final FsAction action) {
-        final StringBuilder sb = new StringBuilder();
-        if (action.implies(FsAction.READ)) {
-            sb.append("r");
-        } else {
-            sb.append("-");
-        }
-
-        if (action.implies(FsAction.WRITE)) {
-            sb.append("w");
-        } else {
-            sb.append("-");
-        }
-
-        if (action.implies(FsAction.EXECUTE)) {
-            sb.append("x");
-        } else {
-            sb.append("-");
-        }
-
-        return sb.toString();
     }
 
     private PathFilter createPathFilter(final ProcessContext context) {
-        final String filterMode = 
context.getProperty(FILE_FILTER_MODE).getValue();
-        return path -> {
-            final boolean accepted;
-            if (FILTER_FULL_PATH_VALUE.getValue().equals(filterMode)) {
-                accepted = 
fileFilterRegexPattern.matcher(path.toString()).matches()
+        final FilterMode filterMode = 
FilterMode.forName(context.getProperty(FILE_FILTER_MODE).getValue());
+        final boolean recursive = 
context.getProperty(RECURSE_SUBDIRS).asBoolean();
+
+        switch (filterMode) {
+            case FILTER_MODE_FILES_ONLY:
+                return path -> 
fileFilterRegexPattern.matcher(path.getName()).matches();
+            case FILTER_MODE_FULL_PATH:
+                return path -> 
fileFilterRegexPattern.matcher(path.toString()).matches()
                         || 
fileFilterRegexPattern.matcher(Path.getPathWithoutSchemeAndAuthority(path).toString()).matches();
-            } else {
-                accepted =  
fileFilterRegexPattern.matcher(path.getName()).matches();
-            }
-            return accepted;
-        };
+            // FILTER_DIRECTORIES_AND_FILES
+            default:
+                return path -> 
Stream.of(Path.getPathWithoutSchemeAndAuthority(path).toString().split("/"))
+                        .skip(getPathSegmentsToSkip(recursive))
+                        .allMatch(v -> 
fileFilterRegexPattern.matcher(v).matches());
+        }
     }
 
-    protected Relationship getSuccessRelationship() {
-        return REL_SUCCESS;
+    private int getPathSegmentsToSkip(final boolean recursive) {
+        // We need to skip the first leading '/' of the path and if the 
traverse is recursive
+        // the filter will be applied only to the subdirectories.
+        return recursive ? 2 : 1;
     }
 
-    protected String getAttributePrefix() {
-        return "hdfs";
+    private void updateState(final ProcessSession session, final Map<String, 
String> newState) {
+        // In case of legacy state we update the state even if there are no 
listable files.
+        try {
+            session.setState(newState, Scope.CLUSTER);
+        } catch (IOException e) {
+            getLogger().warn("Failed to save cluster-wide state. If NiFi is 
restarted, data duplication may occur", e);
+        }
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/FileStatusIterable.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/FileStatusIterable.java
new file mode 100644
index 0000000000..67d64de94b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/FileStatusIterable.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+public class FileStatusIterable implements Iterable<FileStatus> {
+    private final Path path;
+    private final boolean recursive;
+    private final FileSystem fileSystem;
+    private final UserGroupInformation userGroupInformation;
+    private long totalFileCount;
+
+    public FileStatusIterable(final Path path, final boolean recursive, final 
FileSystem fileSystem, final UserGroupInformation userGroupInformation) {
+        this.path = path;
+        this.recursive = recursive;
+        this.fileSystem = fileSystem;
+        this.userGroupInformation = userGroupInformation;
+    }
+
+    @Override
+    public Iterator<FileStatus> iterator() {
+        return new FileStatusIterator();
+    }
+
+    public long getTotalFileCount() {
+        return totalFileCount;
+    }
+
+    class FileStatusIterator implements Iterator<FileStatus> {
+
+        private static final String IO_ERROR_MESSAGE = "IO error occurred 
while iterating Hadoop File System";
+        private static final String THREAD_INTERRUPT_ERROR_MESSAGE = "Thread 
was interrupted while iterating Hadoop File System";
+
+        private final Deque<Path> dirPaths;
+        private FileStatus nextFileStatus;
+        private RemoteIterator<FileStatus> remoteIterator;
+
+        public FileStatusIterator() {
+            dirPaths = new ArrayDeque<>();
+            remoteIterator = getRemoteIterator(path);
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (nextFileStatus != null) {
+                return true;
+            }
+            try {
+                while (remoteIterator.hasNext() || !dirPaths.isEmpty()) {
+                    if (remoteIterator.hasNext()) {
+                        FileStatus fs = remoteIterator.next();
+                        if (fs.isDirectory()) {
+                            if (recursive) {
+                                dirPaths.push(fs.getPath());
+                            }
+                            // if not recursive, continue
+                        } else {
+                            nextFileStatus = fs;
+                            return true;
+                        }
+                    } else {
+                        remoteIterator = getRemoteIterator(dirPaths.pop());
+                    }
+                }
+                return false;
+            } catch (IOException e) {
+                throw new ProcessException(IO_ERROR_MESSAGE, e);
+            }
+        }
+
+        @Override
+        public FileStatus next() {
+            if (nextFileStatus == null) {
+                if (!hasNext()) {
+                    throw new NoSuchElementException();
+                }
+            }
+            totalFileCount++;
+            final FileStatus nextFileStatus = this.nextFileStatus;
+            this.nextFileStatus = null;
+            return nextFileStatus;
+        }
+
+        private RemoteIterator<FileStatus> getRemoteIterator(final Path 
currentPath) {
+            try {
+                return 
userGroupInformation.doAs((PrivilegedExceptionAction<RemoteIterator<FileStatus>>)
 () -> fileSystem.listStatusIterator(currentPath));
+            } catch (IOException e) {
+                throw new ProcessException(IO_ERROR_MESSAGE, e);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new ProcessException(THREAD_INTERRUPT_ERROR_MESSAGE, e);
+            }
+        }
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/FileStatusManager.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/FileStatusManager.java
new file mode 100644
index 0000000000..b6730d7868
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/FileStatusManager.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import org.apache.hadoop.fs.FileStatus;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Keeps a list of the latest modified file paths and the latest modified 
timestamp of the current run.
+ */
+public class FileStatusManager {
+
+    private final List<String> currentLatestFiles;
+    private long currentLatestTimestamp;
+
+    public FileStatusManager(final long initialLatestTimestamp, final 
List<String> initialLatestFiles) {
+        currentLatestTimestamp = initialLatestTimestamp;
+        currentLatestFiles = new ArrayList<>(initialLatestFiles);
+    }
+
+    public void update(final FileStatus status) {
+        if (status.getModificationTime() > currentLatestTimestamp) {
+            currentLatestTimestamp = status.getModificationTime();
+            currentLatestFiles.clear();
+            currentLatestFiles.add(status.getPath().toString());
+        } else if (status.getModificationTime() == currentLatestTimestamp) {
+            currentLatestFiles.add(status.getPath().toString());
+        }
+    }
+
+    public List<String> getCurrentLatestFiles() {
+        return Collections.unmodifiableList(currentLatestFiles);
+    }
+
+    public long getCurrentLatestTimestamp() {
+        return currentLatestTimestamp;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/FilterMode.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/FilterMode.java
new file mode 100644
index 0000000000..fed64ea7a2
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/FilterMode.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import org.apache.nifi.components.DescribedValue;
+
+import java.util.stream.Stream;
+
+import static org.apache.nifi.processors.hadoop.ListHDFS.FILE_FILTER;
+import static org.apache.nifi.processors.hadoop.ListHDFS.RECURSE_SUBDIRS;
+
+public enum FilterMode implements DescribedValue {
+
+    FILTER_DIRECTORIES_AND_FILES(
+            "filter-mode-directories-and-files",
+            "Directories and Files",
+            "Filtering will be applied to the names of directories and files.  
If " + RECURSE_SUBDIRS.getDisplayName()
+                    + " is set to true, only subdirectories with a matching 
name will be searched for files that match "
+                    + "the regular expression defined in " + 
FILE_FILTER.getDisplayName() + "."
+    ),
+    FILTER_MODE_FILES_ONLY(
+            "filter-mode-files-only",
+            "Files Only",
+            "Filtering will only be applied to the names of files.  If " + 
RECURSE_SUBDIRS.getDisplayName()
+                    + " is set to true, the entire subdirectory tree will be 
searched for files that match "
+                    + "the regular expression defined in " + 
FILE_FILTER.getDisplayName() + "."
+    ),
+
+    FILTER_MODE_FULL_PATH(
+            "filter-mode-full-path",
+            "Full Path",
+            "Filtering will be applied by evaluating the regular expression 
defined in " + FILE_FILTER.getDisplayName()
+                    + " against the full path of files with and without the 
scheme and authority.  If "
+                    + RECURSE_SUBDIRS.getDisplayName() + " is set to true, the 
entire subdirectory tree will be searched for files in which the full path of "
+                    + "the file matches the regular expression defined in " + 
FILE_FILTER.getDisplayName() + ".  See 'Additional Details' for more 
information."
+    );
+
+    private final String value;
+    private final String displayName;
+    private final String description;
+
+    FilterMode(final String value, final String displayName, final String 
description) {
+        this.value = value;
+        this.displayName = displayName;
+        this.description = description;
+    }
+
+    @Override
+    public String getValue() {
+        return value;
+    }
+
+    @Override
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+
+    public static FilterMode forName(String filterMode) {
+        return Stream.of(values())
+                .filter(fm -> fm.getValue().equalsIgnoreCase(filterMode))
+                .findFirst()
+                .orElseThrow(
+                        () -> new IllegalArgumentException("Invalid filter 
mode: " + filterMode));
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/FlowFileObjectWriter.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/FlowFileObjectWriter.java
new file mode 100644
index 0000000000..4b9b5608bf
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/FlowFileObjectWriter.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util.writer;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processors.hadoop.ListHDFS;
+import org.apache.nifi.processors.hadoop.util.FileStatusIterable;
+import org.apache.nifi.processors.hadoop.util.FileStatusManager;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class FlowFileObjectWriter extends HadoopFileStatusWriter {
+
+    private static final String HDFS_ATTRIBUTE_PREFIX = "hdfs";
+
+    public FlowFileObjectWriter(final ProcessSession session,
+                                final FileStatusIterable fileStatuses,
+                                final long minimumAge,
+                                final long maximumAge,
+                                final PathFilter pathFilter,
+                                final FileStatusManager fileStatusManager,
+                                final long previousLatestModificationTime,
+                                final List<String> previousLatestFiles) {
+        super(session, fileStatuses, minimumAge, maximumAge, pathFilter, 
fileStatusManager, previousLatestModificationTime, previousLatestFiles);
+    }
+
+    @Override
+    public void write() {
+        for (FileStatus status : fileStatusIterable) {
+            if (determineListable(status)) {
+
+                final Map<String, String> attributes = 
createAttributes(status);
+                FlowFile flowFile = session.create();
+                flowFile = session.putAllAttributes(flowFile, attributes);
+                session.transfer(flowFile, ListHDFS.REL_SUCCESS);
+
+                fileStatusManager.update(status);
+                fileCount++;
+            }
+        }
+    }
+
+    private Map<String, String> createAttributes(final FileStatus status) {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.FILENAME.key(), 
status.getPath().getName());
+        attributes.put(CoreAttributes.PATH.key(), 
getAbsolutePath(status.getPath().getParent()));
+        attributes.put(HDFS_ATTRIBUTE_PREFIX + ".owner", status.getOwner());
+        attributes.put(HDFS_ATTRIBUTE_PREFIX + ".group", status.getGroup());
+        attributes.put(HDFS_ATTRIBUTE_PREFIX + ".lastModified", 
String.valueOf(status.getModificationTime()));
+        attributes.put(HDFS_ATTRIBUTE_PREFIX + ".length", 
String.valueOf(status.getLen()));
+        attributes.put(HDFS_ATTRIBUTE_PREFIX + ".replication", 
String.valueOf(status.getReplication()));
+        attributes.put(HDFS_ATTRIBUTE_PREFIX + ".permissions", 
getPermissionsString(status.getPermission()));
+        return attributes;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/HadoopFileStatusWriter.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/HadoopFileStatusWriter.java
new file mode 100644
index 0000000000..9adf0d3652
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/HadoopFileStatusWriter.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util.writer;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processors.hadoop.util.FileStatusIterable;
+import org.apache.nifi.processors.hadoop.util.FileStatusManager;
+
+import java.util.List;
+
+/**
+ * Interface for common management of writing to records and to FlowFiles.
+ */
+public abstract class HadoopFileStatusWriter {
+
+    protected final ProcessSession session;
+    protected final FileStatusIterable fileStatusIterable;
+    protected final long minimumAge;
+    protected final long maximumAge;
+    protected final PathFilter pathFilter;
+    protected final FileStatusManager fileStatusManager;
+    protected final long previousLatestTimestamp;
+    protected final List<String> previousLatestFiles;
+    protected long fileCount;
+    private final long currentTimeMillis;
+
+
+    HadoopFileStatusWriter(final ProcessSession session,
+                           final FileStatusIterable fileStatusIterable,
+                           final long minimumAge,
+                           final long maximumAge,
+                           final PathFilter pathFilter,
+                           final FileStatusManager fileStatusManager,
+                           final long previousLatestTimestamp,
+                           final List<String> previousLatestFiles) {
+        this.session = session;
+        this.fileStatusIterable = fileStatusIterable;
+        this.minimumAge = minimumAge;
+        this.maximumAge = maximumAge;
+        this.pathFilter = pathFilter;
+        this.fileStatusManager = fileStatusManager;
+        this.previousLatestTimestamp = previousLatestTimestamp;
+        this.previousLatestFiles = previousLatestFiles;
+        currentTimeMillis = System.currentTimeMillis();
+        fileCount = 0L;
+    }
+
+    public abstract void write();
+
+    public long getListedFileCount() {
+        return fileCount;
+    }
+
+    protected boolean determineListable(final FileStatus status) {
+
+        final boolean isCopyInProgress = 
status.getPath().getName().endsWith("_COPYING_");
+        final boolean isFilterAccepted = pathFilter.accept(status.getPath());
+        if (isCopyInProgress || !isFilterAccepted) {
+            return false;
+        }
+        // If the file was created during the processor's last iteration we 
have to check if it was already listed
+        if (status.getModificationTime() == previousLatestTimestamp) {
+            return !previousLatestFiles.contains(status.getPath().toString());
+        }
+
+        final long fileAge = currentTimeMillis - status.getModificationTime();
+        if (minimumAge > fileAge || fileAge > maximumAge) {
+            return false;
+        }
+
+        return status.getModificationTime() > previousLatestTimestamp;
+    }
+
+    String getAbsolutePath(final Path path) {
+        final Path parent = path.getParent();
+        final String prefix = (parent == null || parent.getName().equals("")) 
? "" : getAbsolutePath(parent);
+        return prefix + "/" + path.getName();
+    }
+
+    String getPerms(final FsAction action) {
+        final StringBuilder sb = new StringBuilder();
+
+        sb.append(action.implies(FsAction.READ) ? "r" : "-");
+        sb.append(action.implies(FsAction.WRITE) ? "w" : "-");
+        sb.append(action.implies(FsAction.EXECUTE) ? "x" : "-");
+
+        return sb.toString();
+    }
+
+    String getPermissionsString(final FsPermission permission) {
+        return String.format("%s%s%s", getPerms(permission.getUserAction()),
+                getPerms(permission.getGroupAction()), 
getPerms(permission.getOtherAction()));
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/RecordObjectWriter.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/RecordObjectWriter.java
new file mode 100644
index 0000000000..066460a022
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/RecordObjectWriter.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util.writer;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.hadoop.util.FileStatusIterable;
+import org.apache.nifi.processors.hadoop.util.FileStatusManager;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.OutputStream;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.nifi.processors.hadoop.ListHDFS.REL_SUCCESS;
+
+public class RecordObjectWriter extends HadoopFileStatusWriter {
+
+    private static final RecordSchema RECORD_SCHEMA;
+
+    private static final String FILENAME = "filename";
+    private static final String PATH = "path";
+    private static final String IS_DIRECTORY = "directory";
+    private static final String SIZE = "size";
+    private static final String LAST_MODIFIED = "lastModified";
+    private static final String PERMISSIONS = "permissions";
+    private static final String OWNER = "owner";
+    private static final String GROUP = "group";
+    private static final String REPLICATION = "replication";
+    private static final String IS_SYM_LINK = "symLink";
+    private static final String IS_ENCRYPTED = "encrypted";
+    private static final String IS_ERASURE_CODED = "erasureCoded";
+
+    static {
+        final List<RecordField> recordFields = new ArrayList<>();
+        recordFields.add(new RecordField(FILENAME, 
RecordFieldType.STRING.getDataType(), false));
+        recordFields.add(new RecordField(PATH, 
RecordFieldType.STRING.getDataType(), false));
+        recordFields.add(new RecordField(IS_DIRECTORY, 
RecordFieldType.BOOLEAN.getDataType(), false));
+        recordFields.add(new RecordField(SIZE, 
RecordFieldType.LONG.getDataType(), false));
+        recordFields.add(new RecordField(LAST_MODIFIED, 
RecordFieldType.TIMESTAMP.getDataType(), false));
+        recordFields.add(new RecordField(PERMISSIONS, 
RecordFieldType.STRING.getDataType()));
+        recordFields.add(new RecordField(OWNER, 
RecordFieldType.STRING.getDataType()));
+        recordFields.add(new RecordField(GROUP, 
RecordFieldType.STRING.getDataType()));
+        recordFields.add(new RecordField(REPLICATION, 
RecordFieldType.INT.getDataType()));
+        recordFields.add(new RecordField(IS_SYM_LINK, 
RecordFieldType.BOOLEAN.getDataType()));
+        recordFields.add(new RecordField(IS_ENCRYPTED, 
RecordFieldType.BOOLEAN.getDataType()));
+        recordFields.add(new RecordField(IS_ERASURE_CODED, 
RecordFieldType.BOOLEAN.getDataType()));
+        RECORD_SCHEMA = new SimpleRecordSchema(recordFields);
+    }
+
+
+    private final RecordSetWriterFactory writerFactory;
+    private final ComponentLog logger;
+
+    public RecordObjectWriter(final ProcessSession session,
+                              final FileStatusIterable fileStatuses,
+                              final long minimumAge,
+                              final long maximumAge,
+                              final PathFilter pathFilter,
+                              final FileStatusManager fileStatusManager,
+                              final long previousLatestModificationTime,
+                              final List<String> previousLatestFiles,
+                              final  RecordSetWriterFactory writerFactory,
+                              final ComponentLog logger) {
+        super(session, fileStatuses, minimumAge, maximumAge, pathFilter, 
fileStatusManager, previousLatestModificationTime, previousLatestFiles);
+        this.writerFactory = writerFactory;
+        this.logger = logger;
+    }
+
+    @Override
+    public void write() {
+        FlowFile flowFile = session.create();
+
+        final WriteResult writeResult;
+        try (
+                final OutputStream out = session.write(flowFile);
+                final RecordSetWriter recordWriter = 
writerFactory.createWriter(logger, RECORD_SCHEMA, out, flowFile)
+        ) {
+            recordWriter.beginRecordSet();
+            for (FileStatus status : fileStatusIterable) {
+                if (determineListable(status)) {
+                    recordWriter.write(createRecordForListing(status));
+                    fileStatusManager.update(status);
+                }
+            }
+            writeResult = recordWriter.finishRecordSet();
+        } catch (Exception e) {
+            throw new ProcessException("An error occurred while writing 
results", e);
+        }
+
+        fileCount = writeResult.getRecordCount();
+        if (fileCount == 0) {
+            session.remove(flowFile);
+        } else {
+            final Map<String, String> attributes = new 
HashMap<>(writeResult.getAttributes());
+            attributes.put("record.count", 
String.valueOf(writeResult.getRecordCount()));
+            flowFile = session.putAllAttributes(flowFile, attributes);
+            session.transfer(flowFile, REL_SUCCESS);
+        }
+    }
+
+    private Record createRecordForListing(final FileStatus fileStatus) {
+        final Map<String, Object> values = new HashMap<>();
+        values.put(FILENAME, fileStatus.getPath().getName());
+        values.put(PATH, getAbsolutePath(fileStatus.getPath().getParent()));
+        values.put(OWNER, fileStatus.getOwner());
+        values.put(GROUP, fileStatus.getGroup());
+        values.put(LAST_MODIFIED, new 
Timestamp(fileStatus.getModificationTime()));
+        values.put(SIZE, fileStatus.getLen());
+        values.put(REPLICATION, fileStatus.getReplication());
+
+        final FsPermission permission = fileStatus.getPermission();
+        final String perms = getPermissionsString(permission);
+        values.put(PERMISSIONS, perms);
+
+        values.put(IS_DIRECTORY, fileStatus.isDirectory());
+        values.put(IS_SYM_LINK, fileStatus.isSymlink());
+        values.put(IS_ENCRYPTED, fileStatus.isEncrypted());
+        values.put(IS_ERASURE_CODED, fileStatus.isErasureCoded());
+
+        return new MapRecord(RECORD_SCHEMA, values);
+    }
+}
+
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
index 6f3ca245de..eb84fd33cf 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
@@ -26,10 +26,11 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.Progressable;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.hadoop.KerberosProperties;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordWriter;
 import org.apache.nifi.util.MockComponentLog;
 import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.jupiter.api.BeforeEach;
@@ -45,39 +46,35 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
 
-import static 
org.apache.nifi.processors.hadoop.ListHDFS.FILTER_DIRECTORIES_AND_FILES_VALUE;
-import static 
org.apache.nifi.processors.hadoop.ListHDFS.FILTER_FILES_ONLY_VALUE;
-import static 
org.apache.nifi.processors.hadoop.ListHDFS.FILTER_FULL_PATH_VALUE;
+import static org.apache.nifi.processors.hadoop.ListHDFS.LATEST_TIMESTAMP_KEY;
+import static org.apache.nifi.processors.hadoop.ListHDFS.REL_SUCCESS;
+import static 
org.apache.nifi.processors.hadoop.util.FilterMode.FILTER_DIRECTORIES_AND_FILES;
+import static 
org.apache.nifi.processors.hadoop.util.FilterMode.FILTER_MODE_FILES_ONLY;
+import static 
org.apache.nifi.processors.hadoop.util.FilterMode.FILTER_MODE_FULL_PATH;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
-public class TestListHDFS {
+class TestListHDFS {
 
     private TestRunner runner;
     private ListHDFSWithMockedFileSystem proc;
-    private NiFiProperties mockNiFiProperties;
-    private KerberosProperties kerberosProperties;
     private MockComponentLog mockLogger;
 
     @BeforeEach
     public void setup() throws InitializationException {
-        mockNiFiProperties = mock(NiFiProperties.class);
-        
when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null);
-        kerberosProperties = new KerberosProperties(null);
+        final KerberosProperties kerberosProperties = new 
KerberosProperties(null);
 
         proc = new ListHDFSWithMockedFileSystem(kerberosProperties);
         mockLogger = spy(new MockComponentLog(UUID.randomUUID().toString(), 
proc));
@@ -88,16 +85,11 @@ public class TestListHDFS {
     }
 
     @Test
-    public void testListingWithValidELFunction() throws InterruptedException {
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testFile.txt")));
+    void testListingWithValidELFunction() {
+        addFileStatus("/test", "testFile.txt", false);
 
         runner.setProperty(ListHDFS.DIRECTORY, 
"${literal('/test'):substring(0,5)}");
 
-        // first iteration will not pick up files because it has to instead 
check timestamps.
-        // We must then wait long enough to ensure that the listing can be 
performed safely and
-        // run the Processor again.
-        runner.run();
-        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
         runner.run();
 
         runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
@@ -107,53 +99,39 @@ public class TestListHDFS {
     }
 
     @Test
-    public void testListingWithFilter() throws InterruptedException {
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testFile.txt")));
+    void testListingWithFilter() {
+        addFileStatus("/test", "testFile.txt", false);
 
         runner.setProperty(ListHDFS.DIRECTORY, 
"${literal('/test'):substring(0,5)}");
         runner.setProperty(ListHDFS.FILE_FILTER, "[^test].*");
 
-        // first iteration will not pick up files because it has to instead 
check timestamps.
-        // We must then wait long enough to ensure that the listing can be 
performed safely and
-        // run the Processor again.
-        runner.run();
-        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
         runner.run();
 
         runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
     }
 
     @Test
-    public void testListingWithInvalidELFunction() throws InterruptedException 
{
+    void testListingWithInvalidELFunction() {
         runner.setProperty(ListHDFS.DIRECTORY, "${literal('/test'):foo()}");
         runner.assertNotValid();
     }
 
     @Test
-    public void testListingWithUnrecognizedELFunction() throws 
InterruptedException {
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testFile.txt")));
+    void testListingWithUnrecognizedELFunction() {
+        addFileStatus("/test", "testFile.txt", false);
 
         runner.setProperty(ListHDFS.DIRECTORY, 
"data_${literal('testing'):substring(0,4)%7D");
 
-        // first iteration will not pick up files because it has to instead 
check timestamps.
-        // We must then wait long enough to ensure that the listing can be 
performed safely and
-        // run the Processor again.
-        runner.run();
-        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
-        runner.run();
+        final AssertionError assertionError = 
assertThrows(AssertionError.class, () -> runner.run());
+        assertEquals(IllegalArgumentException.class, 
assertionError.getCause().getClass());
 
         runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
     }
 
     @Test
-    public void testListingHasCorrectAttributes() throws InterruptedException {
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testFile.txt")));
+    void testListingHasCorrectAttributes() {
+        addFileStatus("/test", "testFile.txt", false);
 
-        // first iteration will not pick up files because it has to instead 
check timestamps.
-        // We must then wait long enough to ensure that the listing can be 
performed safely and
-        // run the Processor again.
-        runner.run();
-        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
         runner.run();
 
         runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
@@ -164,30 +142,24 @@ public class TestListHDFS {
 
 
     @Test
-    public void testRecursiveWithDefaultFilterAndFilterMode() throws 
InterruptedException {
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/.testFile.txt")));
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testFile.txt")));
+    void testRecursiveWithDefaultFilterAndFilterMode() {
+        addFileStatus("/test", ".testFile.txt", false);
+        addFileStatus("/test", "testFile.txt", false);
 
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
-        proc.fileSystem.addFileStatus(new Path("/test/testDir"), new 
FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testDir/1.txt")));
-
-        // first iteration will not pick up files because it has to instead 
check timestamps.
-        // We must then wait long enough to ensure that the listing can be 
performed safely and
-        // run the Processor again.
-        runner.run();
-        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
+        addFileStatus("/test", "testDir", true);
+        addFileStatus("/test/testDir", "1.txt", false);
         runner.run();
 
         runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
 
         final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS);
-        for (int i=0; i < 2; i++) {
+        for (int i = 0; i < 2; i++) {
             final MockFlowFile ff = flowFiles.get(i);
             final String filename = ff.getAttribute("filename");
 
             if (filename.equals("testFile.txt")) {
                 ff.assertAttributeEquals("path", "/test");
-            } else if ( filename.equals("1.txt")) {
+            } else if (filename.equals("1.txt")) {
                 ff.assertAttributeEquals("path", "/test/testDir");
             } else {
                 fail("filename was " + filename);
@@ -196,30 +168,22 @@ public class TestListHDFS {
     }
 
     @Test
-    public void testRecursiveWithCustomFilterDirectoriesAndFiles() throws 
InterruptedException, IOException {
+    void testRecursiveWithCustomFilterDirectoriesAndFiles() {
         // set custom regex filter and filter mode
         runner.setProperty(ListHDFS.FILE_FILTER, ".*txt.*");
-        runner.setProperty(ListHDFS.FILE_FILTER_MODE, 
FILTER_DIRECTORIES_AND_FILES_VALUE.getValue());
-
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testFile.out")));
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testFile.txt")));
+        runner.setProperty(ListHDFS.FILE_FILTER_MODE, 
FILTER_DIRECTORIES_AND_FILES.getValue());
 
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
-        proc.fileSystem.addFileStatus(new Path("/test/testDir"), new 
FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testDir/1.txt")));
+        addFileStatus("/test", "testFile.out", false);
+        addFileStatus("/test", "testFile.txt", false);
+        addFileStatus("/test", "testDir", true);
+        addFileStatus("/test/testDir", "1.txt", false);
+        addFileStatus("/test/testDir", "anotherDir", true);
+        addFileStatus("/test/testDir/anotherDir", "2.out", false);
+        addFileStatus("/test/testDir/anotherDir", "2.txt", false);
+        addFileStatus("/test", "txtDir", true);
+        addFileStatus("/test/txtDir", "3.out", false);
+        addFileStatus("/test/txtDir", "3.txt", false);
 
-        proc.fileSystem.addFileStatus(new Path("/test/testDir"), new 
FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testDir/anotherDir")));
-        proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), 
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testDir/anotherDir/2.out")));
-        proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), 
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testDir/anotherDir/2.txt")));
-
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/txtDir")));
-        proc.fileSystem.addFileStatus(new Path("/test/txtDir"), new 
FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/txtDir/3.out")));
-        proc.fileSystem.addFileStatus(new Path("/test/txtDir"), new 
FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/txtDir/3.txt")));
-
-        // first iteration will not pick up files because it has to instead 
check timestamps.
-        // We must then wait long enough to ensure that the listing can be 
performed safely and
-        // run the Processor again.
-        runner.run();
-        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
         runner.run();
 
         runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
@@ -240,28 +204,21 @@ public class TestListHDFS {
     }
 
     @Test
-    public void testRecursiveWithCustomFilterFilesOnly() throws 
InterruptedException, IOException {
+    void testRecursiveWithCustomFilterFilesOnly() {
         // set custom regex filter and filter mode
         runner.setProperty(ListHDFS.FILE_FILTER, "[^\\.].*\\.txt");
-        runner.setProperty(ListHDFS.FILE_FILTER_MODE, 
FILTER_FILES_ONLY_VALUE.getValue());
-
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testFile.out")));
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testFile.txt")));
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/.partfile.txt")));
-
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
-        proc.fileSystem.addFileStatus(new Path("/test/testDir"), new 
FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testDir/1.txt")));
+        runner.setProperty(ListHDFS.FILE_FILTER_MODE, 
FILTER_MODE_FILES_ONLY.getValue());
 
-        proc.fileSystem.addFileStatus(new Path("/test/testDir"), new 
FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testDir/anotherDir")));
-        proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), 
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testDir/anotherDir/.txt")));
-        proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), 
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testDir/anotherDir/2.out")));
-        proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), 
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testDir/anotherDir/2.txt")));
+        addFileStatus("/test", "testFile.out", false);
+        addFileStatus("/test", "testFile.txt", false);
+        addFileStatus("/test", ".partfile.txt", false);
+        addFileStatus("/test", "testDir", true);
+        addFileStatus("/test/testDir", "1.txt", false);
+        addFileStatus("/test/testDir", "anotherDir", true);
+        addFileStatus("/test/testDir/anotherDir", ".txt", false);
+        addFileStatus("/test/testDir/anotherDir", "2.out", false);
+        addFileStatus("/test/testDir/anotherDir", "2.txt", false);
 
-        // first iteration will not pick up files because it has to instead 
check timestamps.
-        // We must then wait long enough to ensure that the listing can be 
performed safely and
-        // run the Processor again.
-        runner.run();
-        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
         runner.run();
 
         runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 3);
@@ -271,55 +228,41 @@ public class TestListHDFS {
             final MockFlowFile ff = flowFiles.get(i);
             final String filename = ff.getAttribute("filename");
 
-            if (filename.equals("testFile.txt")) {
-                ff.assertAttributeEquals("path", "/test");
-            } else if (filename.equals("1.txt")) {
-                ff.assertAttributeEquals("path", "/test/testDir");
-            } else if (filename.equals("2.txt")) {
-                ff.assertAttributeEquals("path", "/test/testDir/anotherDir");
-            } else {
-                fail("filename was " + filename);
+            switch (filename) {
+                case "testFile.txt":
+                    ff.assertAttributeEquals("path", "/test");
+                    break;
+                case "1.txt":
+                    ff.assertAttributeEquals("path", "/test/testDir");
+                    break;
+                case "2.txt":
+                    ff.assertAttributeEquals("path", 
"/test/testDir/anotherDir");
+                    break;
+                default:
+                    fail("filename was " + filename);
+                    break;
             }
         }
     }
 
     @Test
-    public void 
testRecursiveWithCustomFilterFullPathWithoutSchemeAndAuthority() throws 
InterruptedException, IOException {
+    void testRecursiveWithCustomFilterFullPathWithoutSchemeAndAuthority() {
         // set custom regex filter and filter mode
         runner.setProperty(ListHDFS.FILE_FILTER, "(/.*/)*anotherDir/1\\..*");
-        runner.setProperty(ListHDFS.FILE_FILTER_MODE, 
FILTER_FULL_PATH_VALUE.getValue());
-
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test"),
-                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.out")));
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test"),
-                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.txt")));
-
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test"),
-                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir")));
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir"),
-                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/1.txt")));
-
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir"),
-                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir")));
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir"),
-                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir/1.out")));
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir"),
-                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir/1.txt")));
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir"),
-                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir/2.out")));
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir"),
-                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir/2.txt")));
-
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir"),
-                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir")));
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/someDir"),
-                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir/1.out")));
-
-        // first iteration will not pick up files because it has to instead 
check timestamps.
-        // We must then wait long enough to ensure that the listing can be 
performed safely and
-        // run the Processor again.
-        runner.run();
-        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
+        runner.setProperty(ListHDFS.FILE_FILTER_MODE, 
FILTER_MODE_FULL_PATH.getValue());
+
+        addFileStatus("/test", "testFile.out", false);
+        addFileStatus("/test", "testFile.txt", false);
+        addFileStatus("/test", "testDir", true);
+        addFileStatus("/test/testDir", "1.txt", false);
+        addFileStatus("/test/testDir", "anotherDir", true);
+        addFileStatus("/test/testDir/anotherDir", "1.out", false);
+        addFileStatus("/test/testDir/anotherDir", "1.txt", false);
+        addFileStatus("/test/testDir/anotherDir", "2.out", false);
+        addFileStatus("/test/testDir/anotherDir", "2.txt", false);
+        addFileStatus("/test/testDir", "someDir", true);
+        addFileStatus("/test/testDir/someDir", "1.out", false);
+
         runner.run();
 
         runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
@@ -340,42 +283,23 @@ public class TestListHDFS {
     }
 
     @Test
-    public void testRecursiveWithCustomFilterFullPathWithSchemeAndAuthority() 
throws InterruptedException, IOException {
+    void testRecursiveWithCustomFilterFullPathWithSchemeAndAuthority() {
         // set custom regex filter and filter mode
         runner.setProperty(ListHDFS.FILE_FILTER, 
"hdfs://hdfscluster:8020(/.*/)*anotherDir/1\\..*");
-        runner.setProperty(ListHDFS.FILE_FILTER_MODE, 
FILTER_FULL_PATH_VALUE.getValue());
-
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test"),
-                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.out")));
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test"),
-                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.txt")));
-
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test"),
-                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir")));
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir"),
-                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/1.txt")));
-
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir"),
-                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir")));
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir"),
-                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir/1.out")));
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir"),
-                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir/1.txt")));
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir"),
-                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir/2.out")));
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir"),
-                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir/2.txt")));
-
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir"),
-                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir")));
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/someDir"),
-                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir/1.out")));
-
-        // first iteration will not pick up files because it has to instead 
check timestamps.
-        // We must then wait long enough to ensure that the listing can be 
performed safely and
-        // run the Processor again.
-        runner.run();
-        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
+        runner.setProperty(ListHDFS.FILE_FILTER_MODE, 
FILTER_MODE_FULL_PATH.getValue());
+
+        addFileStatus("/test", "testFile.out", false);
+        addFileStatus("/test", "testFile.txt", false);
+        addFileStatus("/test", "testDir", true);
+        addFileStatus("/test/testDir", "1.txt", false);
+        addFileStatus("/test/testDir", "anotherDir", true);
+        addFileStatus("/test/testDir/anotherDir", "1.out", false);
+        addFileStatus("/test/testDir/anotherDir", "1.txt", false);
+        addFileStatus("/test/testDir/anotherDir", "2.out", false);
+        addFileStatus("/test/testDir/anotherDir", "2.txt", false);
+        addFileStatus("/test/testDir", "someDir", true);
+        addFileStatus("/test/testDir/someDir", "1.out", false);
+
         runner.run();
 
         runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
@@ -396,18 +320,11 @@ public class TestListHDFS {
     }
 
     @Test
-    public void testNotRecursive() throws InterruptedException {
+    void testNotRecursive() {
         runner.setProperty(ListHDFS.RECURSE_SUBDIRS, "false");
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testFile.txt")));
-
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
-        proc.fileSystem.addFileStatus(new Path("/test/testDir"), new 
FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testDir/1.txt")));
-
-        // first iteration will not pick up files because it has to instead 
check timestamps.
-        // We must then wait long enough to ensure that the listing can be 
performed safely and
-        // run the Processor again.
-        runner.run();
-        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
+        addFileStatus("/test", "testFile.txt", false);
+        addFileStatus("/test", "testDir", true);
+        addFileStatus("/test/testDir", "1.txt", false);
         runner.run();
 
         runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
@@ -419,14 +336,10 @@ public class TestListHDFS {
 
 
     @Test
-    public void testNoListUntilUpdateFromRemoteOnPrimaryNodeChange() throws 
IOException, InterruptedException {
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 1999L, 0L, create777(), "owner", "group", new 
Path("/test/testFile.txt")));
+    void testNoListUntilUpdateFromRemoteOnPrimaryNodeChange() throws 
IOException {
+        addFileStatus("/test", "testFile.txt", false, 1999L, 0L);
+
 
-        // first iteration will not pick up files because it has to instead 
check timestamps.
-        // We must then wait long enough to ensure that the listing can be 
performed safely and
-        // run the Processor again.
-        runner.run();
-        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
         runner.run();
 
         runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
@@ -438,7 +351,7 @@ public class TestListHDFS {
         runner.clearTransferState();
 
         // add new file to pull
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 2000L, 0L, create777(), "owner", "group", new 
Path("/test/testFile2.txt")));
+        addFileStatus("/test", "testFile2.txt", false, 2000L, 0L);
 
         runner.getStateManager().setFailOnStateGet(Scope.CLUSTER, true);
 
@@ -453,66 +366,43 @@ public class TestListHDFS {
         runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
 
         runner.getStateManager().setFailOnStateGet(Scope.CLUSTER, false);
-        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
-
-        runner.run();
-
-        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
-        Map<String, String> newState = 
runner.getStateManager().getState(Scope.CLUSTER).toMap();
-        assertEquals("2000", newState.get(ListHDFS.LISTING_TIMESTAMP_KEY));
-        assertEquals("1999", newState.get(ListHDFS.EMITTED_TIMESTAMP_KEY));
 
-        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
         runner.run();
 
-        newState = runner.getStateManager().getState(Scope.CLUSTER).toMap();
-        assertEquals("2000", newState.get(ListHDFS.LISTING_TIMESTAMP_KEY));
-        assertEquals("2000", newState.get(ListHDFS.EMITTED_TIMESTAMP_KEY));
-
         runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
+        Map<String, String> newState = 
runner.getStateManager().getState(Scope.CLUSTER).toMap();
+        assertEquals("2000", newState.get(LATEST_TIMESTAMP_KEY));
     }
 
     @Test
-    public void testOnlyNewestEntriesHeldBack() throws InterruptedException {
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testFile.txt")));
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 8L, 0L, create777(), "owner", "group", new 
Path("/test/testFile2.txt")));
+    void testEntriesWithSameTimestampOnlyAddedOnce() {
+        addFileStatus("/test", "testFile.txt", false, 1L, 0L);
+        addFileStatus("/test", "testFile2.txt", false, 1L, 8L);
 
         // this is a directory, so it won't be counted toward the entries
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
true, 1, 1L, 8L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
-        proc.fileSystem.addFileStatus(new Path("/test/testDir"), new 
FileStatus(1L, false, 1, 1L, 100L, 0L, create777(), "owner", "group", new 
Path("/test/testDir/1.txt")));
-        proc.fileSystem.addFileStatus(new Path("/test/testDir"), new 
FileStatus(1L, false, 1, 1L, 100L, 0L, create777(), "owner", "group", new 
Path("/test/testDir/2.txt")));
-
-        // The first iteration should pick up 2 files with the smaller 
timestamps.
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
+        addFileStatus("/test", "testDir", true, 1L, 8L);
+        addFileStatus("/test/testDir", "1.txt", false, 1L, 100L);
 
-        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
+        // The first iteration should pick up 3 files with the smaller 
timestamps.
         runner.run();
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 3);
 
-        // Next iteration should pick up the other 2 files, since nothing else 
was added.
-        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 4);
+        addFileStatus("/test/testDir", "2.txt", false, 1L, 100L);
 
-        proc.fileSystem.addFileStatus(new Path("/test/testDir"), new 
FileStatus(1L, false, 1, 1L, 110L, 0L, create777(), "owner", "group", new 
Path("/test/testDir/3.txt")));
-        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
         runner.run();
-
         runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 4);
-
-        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
-        runner.run();
-
-        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 5);
     }
 
     @Test
-    public void testMinAgeMaxAge() throws IOException, InterruptedException {
-        long now = new Date().getTime();
-        long oneHourAgo = now - 3600000;
-        long twoHoursAgo = now - 2*3600000;
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, now, now, create777(), "owner", "group", new 
Path("/test/willBeIgnored.txt")));
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, now-5, now-5, create777(), "owner", "group", new 
Path("/test/testFile.txt")));
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, oneHourAgo, oneHourAgo, create777(), "owner", "group", new 
Path("/test/testFile1.txt")));
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, twoHoursAgo, twoHoursAgo, create777(), "owner", "group", new 
Path("/test/testFile2.txt")));
+    void testMinAgeMaxAge() throws IOException {
+
+        final long now = new Date().getTime();
+        final long oneHourAgo = now - 3600000;
+        final long twoHoursAgo = now - 2 * 3600000;
+
+        addFileStatus("/test", "testFile.txt", false, now - 5, now - 5);
+        addFileStatus("/test", "testFile1.txt", false, oneHourAgo, oneHourAgo);
+        addFileStatus("/test", "testFile2.txt", false, twoHoursAgo, 
twoHoursAgo);
 
         // all files
         runner.run();
@@ -522,38 +412,27 @@ public class TestListHDFS {
         runner.getStateManager().clear(Scope.CLUSTER);
 
         // invalid min_age > max_age
-        runner.setProperty(ListHDFS.MIN_AGE, "30 sec");
-        runner.setProperty(ListHDFS.MAX_AGE, "1 sec");
+        runner.setProperty(ListHDFS.MINIMUM_FILE_AGE, "30 sec");
+        runner.setProperty(ListHDFS.MAXIMUM_FILE_AGE, "1 sec");
         runner.assertNotValid();
 
         // only one file (one hour ago)
-        runner.setProperty(ListHDFS.MIN_AGE, "30 sec");
-        runner.setProperty(ListHDFS.MAX_AGE, "90 min");
+        runner.setProperty(ListHDFS.MINIMUM_FILE_AGE, "30 sec");
+        runner.setProperty(ListHDFS.MAXIMUM_FILE_AGE, "90 min");
         runner.assertValid();
-        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
-        runner.run(); // will ignore the file for this cycle
-        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
 
-        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
         runner.run();
 
-        // Next iteration should pick up the file, since nothing else was 
added.
         runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
         
runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0).assertAttributeEquals("filename",
 "testFile1.txt");
         runner.clearTransferState();
         runner.getStateManager().clear(Scope.CLUSTER);
 
         // two files (one hour ago and two hours ago)
-        runner.setProperty(ListHDFS.MIN_AGE, "30 sec");
-        runner.removeProperty(ListHDFS.MAX_AGE);
+        runner.setProperty(ListHDFS.MINIMUM_FILE_AGE, "30 sec");
+        runner.removeProperty(ListHDFS.MAXIMUM_FILE_AGE);
         runner.assertValid();
 
-        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
-        runner.run();
-
-        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
-
-        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
         runner.run();
 
         runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
@@ -561,79 +440,47 @@ public class TestListHDFS {
         runner.getStateManager().clear(Scope.CLUSTER);
 
         // two files (now and one hour ago)
-        runner.setProperty(ListHDFS.MIN_AGE, "0 sec");
-        runner.setProperty(ListHDFS.MAX_AGE, "90 min");
+        runner.setProperty(ListHDFS.MINIMUM_FILE_AGE, "0 sec");
+        runner.setProperty(ListHDFS.MAXIMUM_FILE_AGE, "90 min");
         runner.assertValid();
 
-        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
         runner.run();
 
         runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
     }
 
     @Test
-    public void testListAfterDirectoryChange() throws InterruptedException {
-        proc.fileSystem.addFileStatus(new Path("/test1"), new FileStatus(1L, 
false, 1, 1L, 100L,0L, create777(), "owner", "group", new 
Path("/test1/testFile-1_1.txt")));
-        proc.fileSystem.addFileStatus(new Path("/test2"), new FileStatus(1L, 
false, 1, 1L, 150L,0L, create777(), "owner", "group", new 
Path("/test2/testFile-2_1.txt")));
-        proc.fileSystem.addFileStatus(new Path("/test1"), new FileStatus(1L, 
false, 1, 1L, 200L,0L, create777(), "owner", "group", new 
Path("/test1/testFile-1_2.txt")));
+    void testListAfterDirectoryChange() {
+        addFileStatus("/test1", "testFile-1_1.txt", false, 100L, 0L);
+        addFileStatus("/test1", "testFile-1_2.txt", false, 200L, 0L);
+        addFileStatus("/test2", "testFile-2_1.txt", false, 150L, 0L);
 
         runner.setProperty(ListHDFS.DIRECTORY, "/test1");
 
         runner.run(); // Initial run, latest file from /test1 will be ignored
 
-        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
-        runner.run(); // Latest file i.e. testFile-1_2.txt from /test1 should 
also be picked up now
         runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
 
         runner.setProperty(ListHDFS.DIRECTORY, "/test2"); // Changing 
directory should reset the state
 
-        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
-        runner.run(); // Will ignore the files for this cycle
-        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
-
-        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
         runner.run(); // Since state has been reset, testFile-2_1.txt from 
/test2 should be picked up
         runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 3);
     }
 
     @Test
-    public void testListingEmptyDir() throws InterruptedException, IOException 
{
+    void testListingEmptyDir() {
         runner.setProperty(ListHDFS.DIRECTORY, "/test/emptyDir");
 
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test"),
-                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.out")));
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test"),
-                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.txt")));
-
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test"),
-                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/emptyDir")));
-
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test"),
-                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir")));
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir"),
-                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/1.txt")));
-
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir"),
-                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir")));
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir"),
-                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir/1.out")));
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir"),
-                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir/1.txt")));
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir"),
-                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir/2.out")));
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir"),
-                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir/2.txt")));
-
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir"),
-                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir")));
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/someDir"),
-                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir/1.out")));
-
-        // first iteration will not pick up files because it has to instead 
check timestamps.
-        // We must then wait long enough to ensure that the listing can be 
performed safely and
-        // run the Processor again.
-        runner.run();
-        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
+        addFileStatus("/test", "testFile.out", false);
+        addFileStatus("/test", "testFile.txt", false);
+        addFileStatus("/test", "emptyDir", true);
+        addFileStatus("/test/testDir", "1.txt", false);
+        addFileStatus("/test/testDir/anotherDir", "1.out", false);
+        addFileStatus("/test/testDir/anotherDir", "1.txt", false);
+        addFileStatus("/test/testDir/anotherDir", "2.out", false);
+        addFileStatus("/test/testDir/anotherDir", "2.txt", false);
+        addFileStatus("/test/testDir/someDir", "1.out", false);
+
         runner.run();
 
         // verify that no messages were logged at the error level
@@ -641,9 +488,9 @@ public class TestListHDFS {
         final ArgumentCaptor<Throwable> throwableArgumentCaptor = 
ArgumentCaptor.forClass(Throwable.class);
         verify(mockLogger, atLeast(0)).error(anyString(), 
throwableArgumentCaptor.capture());
         // if error.(message, throwable) was called, ignore JobConf CNFEs 
since mapreduce libs are not included as dependencies
-        
assertTrue(throwableArgumentCaptor.getAllValues().stream().flatMap(Stream::of)
+        assertTrue(throwableArgumentCaptor.getAllValues().stream()
                 // check that there are no throwables that are not of JobConf 
CNFE exceptions
-                .noneMatch(throwable -> !(throwable instanceof 
ClassNotFoundException && throwable.getMessage().contains("JobConf"))));
+                .allMatch(throwable -> throwable instanceof 
ClassNotFoundException && throwable.getMessage().contains("JobConf")));
         verify(mockLogger, never()).error(anyString(), any(Object[].class));
         verify(mockLogger, never()).error(anyString(), any(Object[].class), 
any(Throwable.class));
 
@@ -654,45 +501,22 @@ public class TestListHDFS {
     }
 
     @Test
-    public void testListingNonExistingDir() throws InterruptedException, 
IOException {
-        String nonExistingPath = "/test/nonExistingDir";
+    void testListingNonExistingDir() {
+        final String nonExistingPath = "/test/nonExistingDir";
         runner.setProperty(ListHDFS.DIRECTORY, nonExistingPath);
 
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test"),
-                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.out")));
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test"),
-                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.txt")));
-
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test"),
-                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/emptyDir")));
-
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test"),
-                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir")));
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir"),
-                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/1.txt")));
-
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir"),
-                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir")));
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir"),
-                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir/1.out")));
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir"),
-                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir/1.txt")));
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir"),
-                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir/2.out")));
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir"),
-                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir/2.txt")));
-
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir"),
-                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir")));
-        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/someDir"),
-                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir/1.out")));
-
-        // first iteration will not pick up files because it has to instead 
check timestamps.
-        // We must then wait long enough to ensure that the listing can be 
performed safely and
-        // run the Processor again.
-        runner.run();
-        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
-        runner.run();
+        addFileStatus("/test", "testFile.out", false);
+        addFileStatus("/test", "testFile.txt", false);
+        addFileStatus("/test", "emptyDir", true);
+        addFileStatus("/test/testDir", "1.txt", false);
+        addFileStatus("/test/testDir/anotherDir", "1.out", false);
+        addFileStatus("/test/testDir/anotherDir", "1.txt", false);
+        addFileStatus("/test/testDir/anotherDir", "2.out", false);
+        addFileStatus("/test/testDir/anotherDir", "2.txt", false);
+        addFileStatus("/test/testDir/someDir", "1.out", false);
+
+        final AssertionError assertionError = 
assertThrows(AssertionError.class, () -> runner.run());
+        assertEquals(ProcessException.class, 
assertionError.getCause().getClass());
 
         // assert that no files were listed
         runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
@@ -700,12 +524,41 @@ public class TestListHDFS {
         runner.assertPenalizeCount(0);
     }
 
+    @Test
+    void testRecordWriter() throws InitializationException {
+        runner.setProperty(ListHDFS.DIRECTORY, "/test");
+
+        final MockRecordWriter recordWriter = new MockRecordWriter(null, 
false);
+        runner.addControllerService("record-writer", recordWriter);
+        runner.enableControllerService(recordWriter);
+        runner.setProperty(ListHDFS.RECORD_WRITER, "record-writer");
+
+        addFileStatus("/test", "testFile.out", false);
+        addFileStatus("/test", "testFile.txt", false);
+        addFileStatus("/test", "testDir", true);
+        addFileStatus("/test/testDir", "1.txt", false);
+        addFileStatus("/test/testDir", "anotherDir", true);
+        addFileStatus("/test/testDir/anotherDir", "1.out", false);
+        addFileStatus("/test/testDir/anotherDir", "1.txt", false);
+        addFileStatus("/test/testDir/anotherDir", "2.out", false);
+        addFileStatus("/test/testDir/anotherDir", "2.txt", false);
+        addFileStatus("/test/testDir", "someDir", true);
+        addFileStatus("/test/testDir/someDir", "1.out", false);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1);
+        final List<MockFlowFile> flowFilesForRelationship = 
runner.getFlowFilesForRelationship(REL_SUCCESS);
+        final MockFlowFile flowFile = flowFilesForRelationship.get(0);
+        flowFile.assertAttributeEquals("record.count", "8");
+    }
+
     private FsPermission create777() {
-        return new FsPermission((short) 0777);
+        return new FsPermission((short) 0x777);
     }
 
 
-    private class ListHDFSWithMockedFileSystem extends ListHDFS {
+    private static class ListHDFSWithMockedFileSystem extends ListHDFS {
         private final MockFileSystem fileSystem = new MockFileSystem();
         private final KerberosProperties testKerberosProps;
 
@@ -724,25 +577,16 @@ public class TestListHDFS {
         }
 
         @Override
-        protected File getPersistenceFile() {
-            return new File("target/conf/state-file");
-        }
-
-        @Override
-        protected FileSystem getFileSystem(final Configuration config) throws 
IOException {
+        protected FileSystem getFileSystem(final Configuration config) {
             return fileSystem;
         }
     }
 
-    private class MockFileSystem extends FileSystem {
+    private static class MockFileSystem extends FileSystem {
         private final Map<Path, Set<FileStatus>> fileStatuses = new 
HashMap<>();
 
         public void addFileStatus(final Path parent, final FileStatus child) {
-            Set<FileStatus> children = fileStatuses.get(parent);
-            if (children == null) {
-                children = new HashSet<>();
-                fileStatuses.put(parent, children);
-            }
+            final Set<FileStatus> children = 
fileStatuses.computeIfAbsent(parent, k -> new HashSet<>());
 
             children.add(child);
 
@@ -770,33 +614,33 @@ public class TestListHDFS {
         }
 
         @Override
-        public FSDataInputStream open(final Path f, final int bufferSize) 
throws IOException {
+        public FSDataInputStream open(final Path f, final int bufferSize) {
             return null;
         }
 
         @Override
         public FSDataOutputStream create(final Path f, final FsPermission 
permission, final boolean overwrite, final int bufferSize, final short 
replication,
-                                         final long blockSize, final 
Progressable progress) throws IOException {
+                                         final long blockSize, final 
Progressable progress) {
             return null;
         }
 
         @Override
-        public FSDataOutputStream append(final Path f, final int bufferSize, 
final Progressable progress) throws IOException {
+        public FSDataOutputStream append(final Path f, final int bufferSize, 
final Progressable progress) {
             return null;
         }
 
         @Override
-        public boolean rename(final Path src, final Path dst) throws 
IOException {
+        public boolean rename(final Path src, final Path dst) {
             return false;
         }
 
         @Override
-        public boolean delete(final Path f, final boolean recursive) throws 
IOException {
+        public boolean delete(final Path f, final boolean recursive) {
             return false;
         }
 
         @Override
-        public FileStatus[] listStatus(final Path f) throws 
FileNotFoundException, IOException {
+        public FileStatus[] listStatus(final Path f) throws IOException {
             return fileStatuses.keySet().stream()
                     // find the key in fileStatuses that matches the given 
Path f
                     .filter(pathKey -> f.isAbsoluteAndSchemeAuthorityNull()
@@ -825,14 +669,31 @@ public class TestListHDFS {
         }
 
         @Override
-        public boolean mkdirs(final Path f, final FsPermission permission) 
throws IOException {
+        public boolean mkdirs(final Path f, final FsPermission permission) {
             return false;
         }
 
         @Override
-        public FileStatus getFileStatus(final Path f) throws IOException {
-            return null;
+        public FileStatus getFileStatus(final Path path) {
+            final Optional<FileStatus> fileStatus = 
fileStatuses.values().stream()
+                    .flatMap(Set::stream)
+                    .filter(fs -> fs.getPath().equals(path))
+                    .findFirst();
+            if (fileStatus.isEmpty()) {
+                throw new IllegalArgumentException("Could not find 
FileStatus");
+            }
+            return fileStatus.get();
         }
+    }
+
+    private void addFileStatus(final String path, final String filename, final 
boolean isDirectory, final long modificationTime, final long accessTime) {
+        final Path fullPath = new Path("hdfs", "hdfscluster:8020", path);
+        final Path filePath = new Path(fullPath, filename);
+        final FileStatus fileStatus = new FileStatus(1L, isDirectory, 1, 1L, 
modificationTime, accessTime, create777(), "owner", "group", filePath);
+        proc.fileSystem.addFileStatus(fullPath, fileStatus);
+    }
 
+    private void addFileStatus(final String path, final String filename, final 
boolean isDirectory) {
+        addFileStatus(path, filename, isDirectory, 0L, 0L);
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFSPerformanceIT.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFSPerformanceIT.java
new file mode 100644
index 0000000000..64ed58cea2
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFSPerformanceIT.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.nifi.hadoop.KerberosProperties;
+import org.apache.nifi.processors.hadoop.util.FilterMode;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockComponentLog;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.UUID;
+
+import static org.mockito.Mockito.spy;
+
+/**
+ * In order to test different ListHDFS implementations change the 
ListHDFSWithMockedFileSystem ancestor class to the one in question.
+ * Provide the HADOOP_RESOURCE_CONFIG, the ROOT_DIR and set the depth of the 
HDFS tree structure (k-ary complete tree) with the number of files.
+ * First create the structure by running createHdfsNaryCompleteTree() test 
case. Then run the testListHdfsTimeElapsed() test case with
+ * the implementation to test.
+ */
+@Disabled("This is a performance test and should be run manually")
+class TestListHDFSPerformanceIT {
+
+    private static final long BYTE_TO_MB = 1024 * 1024;
+    private static final String HADOOP_RESOURCE_CONFIG = "???";
+    private static final FileSystem FILE_SYSTEM = getFileSystem();
+    private static final String ROOT_DIR = "???";
+    private static final int NUM_CHILDREN = 3;
+    private static final int NUM_OF_FILES = 1000;
+
+
+    private TestRunner runner;
+    private MockComponentLog mockLogger;
+    private ListHDFSWithMockedFileSystem proc;
+
+
+    @BeforeEach
+    public void setup() throws InitializationException {
+        final KerberosProperties kerberosProperties = new 
KerberosProperties(null);
+
+        proc = new ListHDFSWithMockedFileSystem(kerberosProperties);
+        mockLogger = spy(new MockComponentLog(UUID.randomUUID().toString(), 
proc));
+        runner = TestRunners.newTestRunner(proc, mockLogger);
+
+        runner.setProperty(ListHDFS.HADOOP_CONFIGURATION_RESOURCES, 
HADOOP_RESOURCE_CONFIG);
+        runner.setProperty(ListHDFS.DIRECTORY, ROOT_DIR);
+        runner.setProperty(ListHDFS.FILE_FILTER_MODE, 
FilterMode.FILTER_DIRECTORIES_AND_FILES.getValue());
+        runner.setProperty(ListHDFS.FILE_FILTER, "[^\\.].*\\.txt");
+    }
+
+    @Test
+    @Disabled("Enable this test to create an HDFS file tree")
+    void createHdfsNaryCompleteTree() throws IOException {
+        createTree(FILE_SYSTEM, new Path(ROOT_DIR), 0);
+    }
+
+    /**
+     * This only measures an estimate memory usage.
+     */
+    @Test
+    void testListHdfsTimeElapsed() {
+        final Runtime runtime = Runtime.getRuntime();
+        long usedMemoryBefore = getCurrentlyUsedMemory(runtime);
+        Instant start = Instant.now();
+
+        runner.run();
+
+        Instant finish = Instant.now();
+        long timeElapsed = Duration.between(start, finish).toMillis();
+        System.out.println("TIME ELAPSED: " + timeElapsed);
+
+        long usedMemoryAfter = getCurrentlyUsedMemory(runtime);
+        System.out.println("Memory increased (MB):" + (usedMemoryAfter - 
usedMemoryBefore));
+    }
+
+
+    private long getCurrentlyUsedMemory(final Runtime runtime) {
+        return (runtime.totalMemory() - runtime.freeMemory()) / BYTE_TO_MB;
+    }
+
+    private void createTree(FileSystem fileSystem, Path currentPath, int 
depth) throws IOException {
+        if (depth >= NUM_CHILDREN) {
+            for (int j = 0; j < NUM_OF_FILES; j++) {
+                fileSystem.createNewFile(new Path(currentPath + "/file_" + j));
+            }
+            return;
+        }
+
+        for (int i = 0; i < NUM_CHILDREN; i++) {
+            String childPath = currentPath.toString() + "/dir_" + i;
+            Path newPath = new Path(childPath);
+            fileSystem.mkdirs(newPath);
+            for (int j = 0; j < NUM_OF_FILES; j++) {
+                fileSystem.createNewFile(new Path(currentPath + "/file_" + j));
+                System.out.println(i + " | " + j + " | File: " + newPath);
+            }
+            System.out.println(i + " | Directory: " + newPath);
+            createTree(fileSystem, newPath, depth + 1);
+        }
+    }
+
+
+    private static FileSystem getFileSystem() {
+        String[] locations = HADOOP_RESOURCE_CONFIG.split(",");
+        Configuration config = new Configuration();
+        for (String resource : locations) {
+            config.addResource(new Path(resource.trim()));
+        }
+        try {
+            return FileSystem.get(config);
+        } catch (IOException e) {
+            throw new UncheckedIOException("Failed to get FileSystem", e);
+        }
+    }
+
+    private static class ListHDFSWithMockedFileSystem extends ListHDFS {
+        private final KerberosProperties testKerberosProps;
+
+        public ListHDFSWithMockedFileSystem(KerberosProperties 
kerberosProperties) {
+            this.testKerberosProps = kerberosProperties;
+        }
+
+        @Override
+        protected KerberosProperties getKerberosProperties(File 
kerberosConfigFile) {
+            return testKerberosProps;
+        }
+    }
+
+}
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/TestFileStatusIterator.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/TestFileStatusIterator.java
new file mode 100644
index 0000000000..7b0bf3da81
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/TestFileStatusIterator.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class TestFileStatusIterator {
+    @Mock
+    private FileSystem mockHdfs;
+
+    @Mock
+    private UserGroupInformation mockUserGroupInformation;
+
+    @Mock
+    private FileStatus mockFileStatus1;
+
+    @Mock
+    private FileStatus mockFileStatus2;
+
+    @Mock
+    private FileStatus mockFileStatus3;
+
+    private FileStatusIterable fileStatusIterable;
+
+    @BeforeEach
+    void setup() {
+        fileStatusIterable = new FileStatusIterable(new 
Path("/path/to/files"), false, mockHdfs, mockUserGroupInformation);
+    }
+
+    @Test
+    void pathWithNoFilesShouldReturnEmptyIterator() throws Exception {
+        
when(mockUserGroupInformation.doAs(any(PrivilegedExceptionAction.class))).thenReturn(new
 MockRemoteIterator());
+
+        final Iterator<FileStatus> iterator = fileStatusIterable.iterator();
+
+        assertFalse(iterator.hasNext());
+        assertThrows(NoSuchElementException.class, iterator::next);
+    }
+
+    @Test
+    void pathWithMultipleFilesShouldReturnIteratorWithCorrectFiles() throws 
Exception {
+        final FileStatus[] fileStatuses = {mockFileStatus1, mockFileStatus2, 
mockFileStatus3};
+        setupFileStatusMocks(fileStatuses);
+
+
+        final Iterator<FileStatus> iterator = fileStatusIterable.iterator();
+        final Set<FileStatus> expectedFileStatuses = new 
HashSet<>(Arrays.asList(fileStatuses));
+        final Set<FileStatus> actualFileStatuses = new HashSet<>();
+
+        assertTrue(iterator.hasNext());
+        actualFileStatuses.add(iterator.next());
+        assertTrue(iterator.hasNext());
+        actualFileStatuses.add(iterator.next());
+        assertTrue(iterator.hasNext());
+        actualFileStatuses.add(iterator.next());
+
+        assertEquals(expectedFileStatuses, actualFileStatuses);
+
+        assertFalse(iterator.hasNext());
+        assertThrows(NoSuchElementException.class, iterator::next);
+    }
+
+    @Test
+    void getTotalFileCountWithMultipleFilesShouldReturnCorrectCount() throws 
Exception {
+        final FileStatus[] fileStatuses = {mockFileStatus1, mockFileStatus2, 
mockFileStatus3};
+        setupFileStatusMocks(fileStatuses);
+
+        assertEquals(0, fileStatusIterable.getTotalFileCount());
+
+        for (FileStatus ignored : fileStatusIterable) {
+            // count files
+        }
+
+        assertEquals(3, fileStatusIterable.getTotalFileCount());
+    }
+
+    private void setupFileStatusMocks(FileStatus[] fileStatuses) throws 
IOException, InterruptedException {
+        when(mockHdfs.listStatusIterator(any(Path.class))).thenReturn(new 
MockRemoteIterator(fileStatuses));
+
+        
when(mockUserGroupInformation.doAs(any(PrivilegedExceptionAction.class))).thenAnswer(invocation
 -> {
+            // Get the provided lambda expression
+            PrivilegedExceptionAction action = invocation.getArgument(0);
+
+            // Invoke the lambda expression and return the result
+            return action.run();
+        });
+    }
+
+    private static class MockRemoteIterator implements 
RemoteIterator<FileStatus> {
+
+        final Deque<FileStatus> deque;
+
+        public MockRemoteIterator(FileStatus... fileStatuses) {
+            deque = new ArrayDeque<>();
+            Collections.addAll(deque, fileStatuses);
+        }
+
+        @Override
+        public boolean hasNext() {
+            return !deque.isEmpty();
+        }
+
+        @Override
+        public FileStatus next() {
+            return deque.pop();
+        }
+    }
+}
+

Reply via email to