http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
index 361f1ed..7aa534f 100644
--- 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
+++ 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
@@ -58,18 +58,15 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.StopWatch;
 
-/**
- * This processor reads files from HDFS into NiFi FlowFiles.
- */
 @TriggerWhenEmpty
 @Tags({"hadoop", "HDFS", "get", "fetch", "ingest", "source", "filesystem"})
-@CapabilityDescription("Fetch files from Hadoop Distributed File System (HDFS) 
into FlowFiles")
+@CapabilityDescription("Fetch files from Hadoop Distributed File System (HDFS) 
into FlowFiles. This Processor will delete the file from HDFS after fetching 
it.")
 @WritesAttributes({
     @WritesAttribute(attribute = "filename", description = "The name of the 
file that was read from HDFS."),
-    @WritesAttribute(attribute = "path", description = "The path is set to the 
relative path of the file's directory on HDFS. For example, if "
-            + "the Directory property is set to /tmp, then files picked up 
from /tmp will have the path attribute set to \"./\". If the Recurse "
-            + "Subdirectories property is set to true and a file is picked up 
from /tmp/abc/1/2/3, then the path attribute will be set to \"abc/1/2/3\".")})
-@SeeAlso(PutHDFS.class)
+    @WritesAttribute(attribute = "path", description = "The path is set to the 
relative path of the file's directory on HDFS. For example, if the Directory 
property "
+            + "is set to /tmp, then files picked up from /tmp will have the 
path attribute set to \"./\". If the Recurse Subdirectories property is set to 
true and "
+            + "a file is picked up from /tmp/abc/1/2/3, then the path 
attribute will be set to \"abc/1/2/3\".") })
+@SeeAlso({PutHDFS.class, ListHDFS.class})
 public class GetHDFS extends AbstractHadoopProcessor {
 
     public static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
@@ -78,101 +75,101 @@ public class GetHDFS extends AbstractHadoopProcessor {
 
     // relationships
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
-            .name("success")
-            .description("All files retrieved from HDFS are transferred to 
this relationship")
-            .build();
+    .name("success")
+    .description("All files retrieved from HDFS are transferred to this 
relationship")
+    .build();
 
     public static final Relationship REL_PASSTHROUGH = new 
Relationship.Builder()
-            .name("passthrough")
-            .description(
-                    "If this processor has an input queue for some reason, 
then FlowFiles arriving on that input are transferred to this relationship")
+    .name("passthrough")
+    .description(
+            "If this processor has an input queue for some reason, then 
FlowFiles arriving on that input are transferred to this relationship")
             .build();
 
     // properties
     public static final PropertyDescriptor DIRECTORY = new 
PropertyDescriptor.Builder()
-            .name(DIRECTORY_PROP_NAME)
-            .description("The HDFS directory from which files should be read")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
+    .name(DIRECTORY_PROP_NAME)
+    .description("The HDFS directory from which files should be read")
+    .required(true)
+    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+    .build();
 
     public static final PropertyDescriptor RECURSE_SUBDIRS = new 
PropertyDescriptor.Builder()
-            .name("Recurse Subdirectories")
-            .description("Indicates whether to pull files from subdirectories 
of the HDFS directory")
-            .required(true)
-            .allowableValues("true", "false")
-            .defaultValue("true")
-            .build();
+    .name("Recurse Subdirectories")
+    .description("Indicates whether to pull files from subdirectories of the 
HDFS directory")
+    .required(true)
+    .allowableValues("true", "false")
+    .defaultValue("true")
+    .build();
 
     public static final PropertyDescriptor KEEP_SOURCE_FILE = new 
PropertyDescriptor.Builder()
-            .name("Keep Source File")
-            .description("Determines whether to delete the file from HDFS 
after it has been successfully transferred")
-            .required(true)
-            .allowableValues("true", "false")
-            .defaultValue("false")
-            .build();
+    .name("Keep Source File")
+    .description("Determines whether to delete the file from HDFS after it has 
been successfully transferred. If true, the file will be fetched repeatedly. 
This is intended for testing only.")
+    .required(true)
+    .allowableValues("true", "false")
+    .defaultValue("false")
+    .build();
 
     public static final PropertyDescriptor FILE_FILTER_REGEX = new 
PropertyDescriptor.Builder()
-            .name("File Filter Regex")
-            .description("A Java Regular Expression for filtering Filenames; 
if a filter is supplied then only files whose names match that Regular "
-                    + "Expression will be fetched, otherwise all files will be 
fetched")
+    .name("File Filter Regex")
+    .description("A Java Regular Expression for filtering Filenames; if a 
filter is supplied then only files whose names match that Regular "
+            + "Expression will be fetched, otherwise all files will be 
fetched")
             .required(false)
             .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
             .build();
 
     public static final PropertyDescriptor FILTER_MATCH_NAME_ONLY = new 
PropertyDescriptor.Builder()
-            .name("Filter Match Name Only")
-            .description("If true then File Filter Regex will match on just 
the filename, otherwise subdirectory names will be included with filename "
-                    + "in the regex comparison")
+    .name("Filter Match Name Only")
+    .description("If true then File Filter Regex will match on just the 
filename, otherwise subdirectory names will be included with filename "
+            + "in the regex comparison")
             .required(true)
             .allowableValues("true", "false")
             .defaultValue("true")
             .build();
 
     public static final PropertyDescriptor IGNORE_DOTTED_FILES = new 
PropertyDescriptor.Builder()
-            .name("Ignore Dotted Files")
-            .description("If true, files whose names begin with a dot (\".\") 
will be ignored")
-            .required(true)
-            .allowableValues("true", "false")
-            .defaultValue("true")
-            .build();
+    .name("Ignore Dotted Files")
+    .description("If true, files whose names begin with a dot (\".\") will be 
ignored")
+    .required(true)
+    .allowableValues("true", "false")
+    .defaultValue("true")
+    .build();
 
     public static final PropertyDescriptor MIN_AGE = new 
PropertyDescriptor.Builder()
-            .name("Minimum File Age")
-            .description("The minimum age that a file must be in order to be 
pulled; any file younger than this amount of time (based on last modification 
date) will be ignored")
-            .required(true)
-            .addValidator(StandardValidators.createTimePeriodValidator(0, 
TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
-            .defaultValue("0 sec")
-            .build();
+    .name("Minimum File Age")
+    .description("The minimum age that a file must be in order to be pulled; 
any file younger than this amount of time (based on last modification date) 
will be ignored")
+    .required(true)
+    .addValidator(StandardValidators.createTimePeriodValidator(0, 
TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
+    .defaultValue("0 sec")
+    .build();
 
     public static final PropertyDescriptor MAX_AGE = new 
PropertyDescriptor.Builder()
-            .name("Maximum File Age")
-            .description("The maximum age that a file must be in order to be 
pulled; any file older than this amount of time (based on last modification 
date) will be ignored")
-            .required(false)
-            .addValidator(StandardValidators.createTimePeriodValidator(100, 
TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
-            .build();
+    .name("Maximum File Age")
+    .description("The maximum age that a file must be in order to be pulled; 
any file older than this amount of time (based on last modification date) will 
be ignored")
+    .required(false)
+    .addValidator(StandardValidators.createTimePeriodValidator(100, 
TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
+    .build();
 
     public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
-            .name("Batch Size")
-            .description("The maximum number of files to pull in each 
iteration, based on run schedule.")
-            .required(true)
-            .defaultValue("100")
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .build();
+    .name("Batch Size")
+    .description("The maximum number of files to pull in each iteration, based 
on run schedule.")
+    .required(true)
+    .defaultValue("100")
+    .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+    .build();
 
     public static final PropertyDescriptor POLLING_INTERVAL = new 
PropertyDescriptor.Builder()
-            .name("Polling Interval")
-            .description("Indicates how long to wait between performing 
directory listings")
-            .required(true)
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .defaultValue("0 sec")
-            .build();
+    .name("Polling Interval")
+    .description("Indicates how long to wait between performing directory 
listings")
+    .required(true)
+    .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+    .defaultValue("0 sec")
+    .build();
 
     public static final PropertyDescriptor BUFFER_SIZE = new 
PropertyDescriptor.Builder()
-            .name("IO Buffer Size")
-            .description("Amount of memory to use to buffer file contents 
during IO. This overrides the Hadoop Configuration")
-            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .build();
+    .name("IO Buffer Size")
+    .description("Amount of memory to use to buffer file contents during IO. 
This overrides the Hadoop Configuration")
+    .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+    .build();
 
     private static final Set<Relationship> relationships;
     protected static final List<PropertyDescriptor> localProperties;
@@ -239,8 +236,8 @@ public class GetHDFS extends AbstractHadoopProcessor {
         abstractOnScheduled(context);
         // copy configuration values to pass them around cleanly
         processorConfig = new ProcessorConfiguration(context);
-        FileSystem fs = hdfsResources.get().getValue();
-        Path dir = new Path(context.getProperty(DIRECTORY).getValue());
+        final FileSystem fs = getFileSystem();
+        final Path dir = new Path(context.getProperty(DIRECTORY).getValue());
         if (!fs.exists(dir)) {
             throw new IOException("PropertyDescriptor " + DIRECTORY + " has 
invalid value " + dir + ". The directory does not exist.");
         }
@@ -333,8 +330,8 @@ public class GetHDFS extends AbstractHadoopProcessor {
     protected void processBatchOfFiles(final List<Path> files, final 
ProcessContext context, final ProcessSession session) {
         // process the batch of files
         FSDataInputStream stream = null;
-        Configuration conf = hdfsResources.get().getKey();
-        FileSystem hdfs = hdfsResources.get().getValue();
+        Configuration conf = getConfiguration();
+        FileSystem hdfs = getFileSystem();
         final boolean keepSourceFiles = 
context.getProperty(KEEP_SOURCE_FILE).asBoolean();
         final Double bufferSizeProp = 
context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
         int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : 
conf.getInt(BUFFER_SIZE_KEY,
@@ -401,7 +398,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
 
         if (System.currentTimeMillis() >= nextPollTime && 
listingLock.tryLock()) {
             try {
-                final FileSystem hdfs = hdfsResources.get().getValue();
+                final FileSystem hdfs = getFileSystem();
                 // get listing
                 listing = selectFiles(hdfs, 
processorConfig.getConfiguredRootDirPath(), null);
                 lastPollTime.set(System.currentTimeMillis());
@@ -464,33 +461,6 @@ public class GetHDFS extends AbstractHadoopProcessor {
     }
 
     /**
-     * Returns the relative path of the child that does not include the 
filename or the root path.
-     *
-     * @param root root
-     * @param child child
-     * @return the relative path of the child that does not include the 
filename or the root path
-     */
-    public static String getPathDifference(final Path root, final Path child) {
-        final int depthDiff = child.depth() - root.depth();
-        if (depthDiff <= 1) {
-            return "".intern();
-        }
-        String lastRoot = root.getName();
-        Path childsParent = child.getParent();
-        final StringBuilder builder = new StringBuilder();
-        builder.append(childsParent.getName());
-        for (int i = (depthDiff - 3); i >= 0; i--) {
-            childsParent = childsParent.getParent();
-            String name = childsParent.getName();
-            if (name.equals(lastRoot) && 
childsParent.toString().endsWith(root.toString())) {
-                break;
-            }
-            builder.insert(0, Path.SEPARATOR).insert(0, name);
-        }
-        return builder.toString();
-    }
-
-    /**
      * Holder for a snapshot in time of some processor properties that are 
passed around.
      */
     protected static class ProcessorConfiguration {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java
 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java
index 22ba36b..f032ee4 100644
--- 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java
+++ 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java
@@ -22,6 +22,9 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
@@ -34,10 +37,6 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processors.hadoop.util.SequenceFileReader;
 import org.apache.nifi.util.StopWatch;
-import org.apache.nifi.util.Tuple;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 /**
  * This processor is used to pull files from HDFS. The files being pulled in 
MUST be SequenceFile formatted files. The processor creates a flow file for 
each key/value entry in the ingested
@@ -80,9 +79,8 @@ public class GetHDFSSequenceFile extends GetHDFS {
 
     @Override
     protected void processBatchOfFiles(final List<Path> files, final 
ProcessContext context, final ProcessSession session) {
-        final Tuple<Configuration, FileSystem> hadoopResources = 
hdfsResources.get();
-        final Configuration conf = hadoopResources.getKey();
-        final FileSystem hdfs = hadoopResources.getValue();
+        final Configuration conf = getConfiguration();
+        final FileSystem hdfs = getFileSystem();
         final String flowFileContentValue = 
context.getProperty(FLOWFILE_CONTENT).getValue();
         final boolean keepSourceFiles = 
context.getProperty(KEEP_SOURCE_FILE).asBoolean();
         final Double bufferSizeProp = 
context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
new file mode 100644
index 0000000..151cbf2
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.hadoop.util.HDFSListing;
+import org.apache.nifi.processors.hadoop.util.StringSerDe;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+
+
+@TriggerSerially
+@TriggerWhenEmpty
+@Tags({"hadoop", "HDFS", "get", "list", "ingest", "source", "filesystem"})
+@CapabilityDescription("Retrieves a listing of files from HDFS. For each file 
that is listed in HDFS, creates a FlowFile that represents "
+        + "the HDFS file so that it can be fetched in conjunction with 
ListHDFS. This Processor is designed to run on Primary Node only "
+        + "in a cluster. If the primary node changes, the new Primary Node 
will pick up where the previous node left off without duplicating "
+        + "all of the data. Unlike GetHDFS, this Processor does not delete any 
data from HDFS.")
+@WritesAttributes({
+    @WritesAttribute(attribute="filename", description="The name of the file 
that was read from HDFS."),
+    @WritesAttribute(attribute="path", description="The path is set to the 
absolute path of the file's directory on HDFS. For example, if the Directory 
property is set to /tmp, "
+            + "then files picked up from /tmp will have the path attribute set 
to \"./\". If the Recurse Subdirectories property is set to true and a file is 
picked up "
+            + "from /tmp/abc/1/2/3, then the path attribute will be set to 
\"/tmp/abc/1/2/3\"."),
+    @WritesAttribute(attribute="hdfs.owner", description="The user that owns 
the file in HDFS"),
+    @WritesAttribute(attribute="hdfs.group", description="The group that owns 
the file in HDFS"),
+    @WritesAttribute(attribute="hdfs.lastModified", description="The timestamp 
of when the file in HDFS was last modified, as milliseconds since midnight Jan 
1, 1970 UTC"),
+    @WritesAttribute(attribute="hdfs.length", description="The number of bytes 
in the file in HDFS"),
+    @WritesAttribute(attribute="hdfs.replication", description="The number of 
HDFS replicas for hte file"),
+    @WritesAttribute(attribute="hdfs.permissions", description="The 
permissions for the file in HDFS. This is formatted as 3 characters for the 
owner, "
+            + "3 for the group, and 3 for other users. For example rw-rw-r--")
+})
+@SeeAlso({GetHDFS.class, FetchHDFS.class, PutHDFS.class})
+public class ListHDFS extends AbstractHadoopProcessor {
+    public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new 
PropertyDescriptor.Builder()
+        .name("Distributed Cache Service")
+        .description("Specifies the Controller Service that should be used to 
maintain state about what has been pulled from HDFS so that if a new node "
+                + "begins pulling data, it won't duplicate all of the work 
that has been done.")
+        .required(true)
+        .identifiesControllerService(DistributedMapCacheClient.class)
+        .build();
+
+    public static final PropertyDescriptor DIRECTORY = new 
PropertyDescriptor.Builder()
+        .name(DIRECTORY_PROP_NAME)
+        .description("The HDFS directory from which files should be read")
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
+
+    public static final PropertyDescriptor RECURSE_SUBDIRS = new 
PropertyDescriptor.Builder()
+        .name("Recurse Subdirectories")
+        .description("Indicates whether to list files from subdirectories of 
the HDFS directory")
+        .required(true)
+        .allowableValues("true", "false")
+        .defaultValue("true")
+        .build();
+
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("All FlowFiles are transferred to this relationship")
+        .build();
+
+    private volatile Long lastListingTime = null;
+    private volatile Set<Path> latestPathsListed = new HashSet<>();
+    private volatile boolean electedPrimaryNode = false;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        super.init(context);
+    }
+
+    protected File getPersistenceFile() {
+        return new File("conf/state/" + getIdentifier());
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(HADOOP_CONFIGURATION_RESOURCES);
+        properties.add(DISTRIBUTED_CACHE_SERVICE);
+        properties.add(DIRECTORY);
+        properties.add(RECURSE_SUBDIRS);
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        return relationships;
+    }
+
+    protected String getKey(final String directory) {
+        return getIdentifier() + ".lastListingTime." + directory;
+    }
+
+    @OnPrimaryNodeStateChange
+    public void onPrimaryNodeChange(final PrimaryNodeState newState) {
+        if ( newState == PrimaryNodeState.ELECTED_PRIMARY_NODE ) {
+            electedPrimaryNode = true;
+        }
+    }
+
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
+        if ( descriptor.equals(DIRECTORY) ) {
+            lastListingTime = null; // clear lastListingTime so that we have 
to fetch new time
+            latestPathsListed = new HashSet<>();
+        }
+    }
+
+    private HDFSListing deserialize(final String serializedState) throws 
JsonParseException, JsonMappingException, IOException {
+        final ObjectMapper mapper = new ObjectMapper();
+        final JsonNode jsonNode = mapper.readTree(serializedState);
+        return mapper.readValue(jsonNode, HDFSListing.class);
+    }
+
+
+    private Long getMinTimestamp(final String directory, final 
DistributedMapCacheClient client) throws IOException {
+        // Determine the timestamp for the last file that we've listed.
+        Long minTimestamp = lastListingTime;
+        if ( minTimestamp == null || electedPrimaryNode ) {
+            // We haven't yet restored any state from local or distributed 
state - or it's been at least a minute since
+            // we have performed a listing. In this case,
+            // First, attempt to get timestamp from distributed cache service.
+            try {
+                final StringSerDe serde = new StringSerDe();
+                final String serializedState = client.get(getKey(directory), 
serde, serde);
+                if ( serializedState == null || serializedState.isEmpty() ) {
+                    minTimestamp = null;
+                    this.latestPathsListed = Collections.emptySet();
+                } else {
+                    final HDFSListing listing = deserialize(serializedState);
+                    this.lastListingTime = 
listing.getLatestTimestamp().getTime();
+                    minTimestamp = listing.getLatestTimestamp().getTime();
+                    this.latestPathsListed = listing.toPaths();
+                }
+
+                this.lastListingTime = minTimestamp;
+                electedPrimaryNode = false; // no requirement to pull an 
update from the distributed cache anymore.
+            } catch (final IOException ioe) {
+                throw ioe;
+            }
+
+            // Check the persistence file. We want to use the latest timestamp 
that we have so that
+            // we don't duplicate data.
+            try {
+                final File persistenceFile = getPersistenceFile();
+                if ( persistenceFile.exists() ) {
+                    try (final FileInputStream fis = new 
FileInputStream(persistenceFile)) {
+                        final Properties props = new Properties();
+                        props.load(fis);
+
+                        // get the local timestamp for this directory, if it 
exists.
+                        final String locallyPersistedValue = 
props.getProperty(directory);
+                        if ( locallyPersistedValue != null ) {
+                            final HDFSListing listing = 
deserialize(locallyPersistedValue);
+                            final long localTimestamp = 
listing.getLatestTimestamp().getTime();
+
+                            // If distributed state doesn't have an entry or 
the local entry is later than the distributed state,
+                            // update the distributed state so that we are in 
sync.
+                            if (minTimestamp == null || localTimestamp > 
minTimestamp) {
+                                minTimestamp = localTimestamp;
+
+                                // Our local persistence file shows a later 
time than the Distributed service.
+                                // Update the distributed service to match our 
local state.
+                                try {
+                                    final StringSerDe serde = new 
StringSerDe();
+                                    client.put(getKey(directory), 
locallyPersistedValue, serde, serde);
+                                } catch (final IOException ioe) {
+                                    getLogger().warn("Local timestamp for {} 
is {}, which is later than Distributed state but failed to update Distributed "
+                                            + "state due to {}. If a new node 
performs HDFS Listing, data duplication may occur",
+                                            new Object[] {directory, 
locallyPersistedValue, ioe});
+                                }
+                            }
+                        }
+                    }
+                }
+            } catch (final IOException ioe) {
+                getLogger().warn("Failed to recover local state due to {}. 
Assuming that the state from the distributed cache is correct.", ioe);
+            }
+        }
+
+        return minTimestamp;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final String directory = context.getProperty(DIRECTORY).getValue();
+        final DistributedMapCacheClient client = 
context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
+
+        final Long minTimestamp;
+        try {
+            minTimestamp = getMinTimestamp(directory, client);
+        } catch (final IOException ioe) {
+            getLogger().error("Failed to retrieve timestamp of last listing 
from Distributed Cache Service. Will not perform listing until this is 
accomplished.");
+            context.yield();
+            return;
+        }
+
+        // Pull in any file that is newer than the timestamp that we have.
+        final FileSystem hdfs = getFileSystem();
+        final boolean recursive = 
context.getProperty(RECURSE_SUBDIRS).asBoolean();
+        final Path rootPath = new Path(directory);
+
+        int listCount = 0;
+        Long latestListingModTime = null;
+        final Set<FileStatus> statuses;
+        try {
+            statuses = getStatuses(rootPath, recursive, hdfs);
+            for ( final FileStatus status : statuses ) {
+                // don't get anything where the last modified timestamp is 
equal to our current timestamp.
+                // if we do, then we run the risk of multiple files having the 
same last mod date but us only
+                // seeing a portion of them.
+                // I.e., there could be 5 files with last mod date = (now). 
But if we do the listing now, maybe
+                // only 2 exist and 3 more will exist later in this 
millisecond. So we ignore anything with a
+                // modified date not before the current time.
+                final long fileModTime = status.getModificationTime();
+
+                // we only want the file if its timestamp is later than the 
minTimestamp or equal to and we didn't pull it last time.
+                // Also, HDFS creates files with the suffix _COPYING_ when 
they are being written - we want to ignore those.
+                boolean fetch = 
!status.getPath().getName().endsWith("_COPYING_")
+                        && (minTimestamp == null || fileModTime > minTimestamp 
|| (fileModTime == minTimestamp && 
!latestPathsListed.contains(status.getPath())));
+
+                // Create the FlowFile for this path.
+                if ( fetch ) {
+                    final Map<String, String> attributes = 
createAttributes(status);
+                    FlowFile flowFile = session.create();
+                    flowFile = session.putAllAttributes(flowFile, attributes);
+                    session.transfer(flowFile, REL_SUCCESS);
+                    listCount++;
+
+                    if ( latestListingModTime == null || fileModTime > 
latestListingModTime ) {
+                        latestListingModTime = fileModTime;
+                    }
+                }
+            }
+        } catch (final IOException ioe) {
+            getLogger().error("Failed to perform listing of HDFS due to {}", 
new Object[] {ioe});
+            return;
+        }
+
+        if ( listCount > 0 ) {
+            getLogger().info("Successfully created listing with {} new files 
from HDFS", new Object[] {listCount});
+            session.commit();
+
+            // We have performed a listing and pushed the FlowFiles out.
+            // Now, we need to persist state about the Last Modified timestamp 
of the newest file
+            // that we pulled in. We do this in order to avoid pulling in the 
same file twice.
+            // However, we want to save the state both locally and remotely.
+            // We store the state remotely so that if a new Primary Node is 
chosen, it can pick up where the
+            // previously Primary Node left off.
+            // We also store the state locally so that if the node is 
restarted, and the node cannot contact
+            // the distributed state cache, the node can continue to run (if 
it is primary node).
+            String serializedState = null;
+            try {
+                serializedState = serializeState(latestListingModTime, 
statuses);
+            } catch (final Exception e) {
+                getLogger().error("Failed to serialize state due to {}", new 
Object[] {e});
+            }
+
+            if ( serializedState != null ) {
+                // Save our state locally.
+                try {
+                    persistLocalState(directory, serializedState);
+                } catch (final IOException ioe) {
+                    getLogger().warn("Unable to save state locally. If the 
node is restarted now, data may be duplicated. Failure is due to {}", ioe);
+                }
+
+                // Attempt to save state to remote server.
+                try {
+                    client.put(getKey(directory), serializedState, new 
StringSerDe(), new StringSerDe());
+                } catch (final IOException ioe) {
+                    getLogger().warn("Unable to communicate with distributed 
cache server due to {}. Persisting state locally instead.", ioe);
+                }
+            }
+
+            lastListingTime = latestListingModTime;
+        } else {
+            getLogger().debug("There is no data to list. Yielding.");
+            context.yield();
+
+            // lastListingTime = 0 so that we don't continually poll the 
distributed cache / local file system
+            if ( lastListingTime == null ) {
+                lastListingTime = 0L;
+            }
+
+            return;
+        }
+    }
+
+    private Set<FileStatus> getStatuses(final Path path, final boolean 
recursive, final FileSystem hdfs) throws IOException {
+        final Set<FileStatus> statusSet = new HashSet<>();
+
+        final FileStatus[] statuses = hdfs.listStatus(path);
+
+        for ( final FileStatus status : statuses ) {
+            if ( status.isDirectory() ) {
+                if ( recursive ) {
+                    try {
+                        statusSet.addAll(getStatuses(status.getPath(), 
recursive, hdfs));
+                    } catch (final IOException ioe) {
+                        getLogger().error("Failed to retrieve HDFS listing for 
subdirectory {} due to {}; will continue listing others", new Object[] 
{status.getPath(), ioe});
+                    }
+                }
+            } else {
+                statusSet.add(status);
+            }
+        }
+
+        return statusSet;
+    }
+
+
+    private String serializeState(final long latestListingTime, final 
Set<FileStatus> statuses) throws JsonGenerationException, JsonMappingException, 
IOException {
+        // we need to keep track of all files that we pulled in that had a 
modification time equal to
+        // lastListingTime so that we can avoid pulling those files in again. 
We can't just ignore any files
+        // that have a mod time equal to that timestamp because more files may 
come in with the same timestamp
+        // later in the same millisecond.
+        if ( statuses.isEmpty() ) {
+            return null;
+        } else {
+            final List<FileStatus> sortedStatuses = new ArrayList<>(statuses);
+            Collections.sort(sortedStatuses, new Comparator<FileStatus>() {
+                @Override
+                public int compare(final FileStatus o1, final FileStatus o2) {
+                    return Long.compare(o1.getModificationTime(), 
o2.getModificationTime());
+                }
+            });
+
+            final long latestListingModTime = 
sortedStatuses.get(sortedStatuses.size() - 1).getModificationTime();
+            final Set<Path> pathsWithModTimeEqualToListingModTime = new 
HashSet<>();
+            for (int i=sortedStatuses.size() - 1; i >= 0; i--) {
+                final FileStatus status = sortedStatuses.get(i);
+                if (status.getModificationTime() == latestListingModTime) {
+                    
pathsWithModTimeEqualToListingModTime.add(status.getPath());
+                }
+            }
+
+            this.latestPathsListed = pathsWithModTimeEqualToListingModTime;
+
+            final HDFSListing listing = new HDFSListing();
+            listing.setLatestTimestamp(new Date(latestListingModTime));
+            final Set<String> paths = new HashSet<>();
+            for ( final Path path : pathsWithModTimeEqualToListingModTime ) {
+                paths.add(path.toUri().toString());
+            }
+            listing.setMatchingPaths(paths);
+
+            final ObjectMapper mapper = new ObjectMapper();
+            final String serializedState = 
mapper.writerWithType(HDFSListing.class).writeValueAsString(listing);
+            return serializedState;
+        }
+    }
+
+    protected void persistLocalState(final String directory, final String 
serializedState) throws IOException {
+        // we need to keep track of all files that we pulled in that had a 
modification time equal to
+        // lastListingTime so that we can avoid pulling those files in again. 
We can't just ignore any files
+        // that have a mod time equal to that timestamp because more files may 
come in with the same timestamp
+        // later in the same millisecond.
+        final File persistenceFile = getPersistenceFile();
+        final File dir = persistenceFile.getParentFile();
+        if ( !dir.exists() && !dir.mkdirs() ) {
+            throw new IOException("Could not create directory " + 
dir.getAbsolutePath() + " in order to save local state");
+        }
+
+        final Properties props = new Properties();
+        if ( persistenceFile.exists() ) {
+            try (final FileInputStream fis = new 
FileInputStream(persistenceFile)) {
+                props.load(fis);
+            }
+        }
+
+        props.setProperty(directory, serializedState);
+
+        try (final FileOutputStream fos = new 
FileOutputStream(persistenceFile)) {
+            props.store(fos, null);
+        }
+    }
+
+    private String getAbsolutePath(final Path path) {
+        final Path parent = path.getParent();
+        final String prefix = (parent == null || parent.getName().equals("")) 
? "" : getAbsolutePath(parent);
+        return prefix + "/" + path.getName();
+    }
+
+    private Map<String, String> createAttributes(final FileStatus status) {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.FILENAME.key(), 
status.getPath().getName());
+        attributes.put(CoreAttributes.PATH.key(), 
getAbsolutePath(status.getPath().getParent()));
+
+        attributes.put("hdfs.owner", status.getOwner());
+        attributes.put("hdfs.group", status.getGroup());
+        attributes.put("hdfs.lastModified", 
String.valueOf(status.getModificationTime()));
+        attributes.put("hdfs.length", String.valueOf(status.getLen()));
+        attributes.put("hdfs.replication", 
String.valueOf(status.getReplication()));
+
+        final FsPermission permission = status.getPermission();
+        final String perms = getPerms(permission.getUserAction()) + 
getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction());
+        attributes.put("hdfs.permissions", perms);
+        return attributes;
+    }
+
+    private String getPerms(final FsAction action) {
+        final StringBuilder sb = new StringBuilder();
+        if ( action.implies(FsAction.READ) ) {
+            sb.append("r");
+        } else {
+            sb.append("-");
+        }
+
+        if ( action.implies(FsAction.WRITE) ) {
+            sb.append("w");
+        } else {
+            sb.append("-");
+        }
+
+        if ( action.implies(FsAction.EXECUTE) ) {
+            sb.append("x");
+        } else {
+            sb.append("-");
+        }
+
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 057f786..52cf475 100644
--- 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -32,10 +32,10 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
@@ -54,7 +54,6 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.stream.io.BufferedInputStream;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.StopWatch;
-import org.apache.nifi.util.Tuple;
 
 /**
  * This processor copies FlowFiles to HDFS.
@@ -183,8 +182,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
         } else {
             dfsUmask = FsPermission.DEFAULT_UMASK;
         }
-        final Tuple<Configuration, FileSystem> resources = hdfsResources.get();
-        final Configuration conf = resources.getKey();
+        final Configuration conf = getConfiguration();
         FsPermission.setUMask(conf, new FsPermission(dfsUmask));
     }
 
@@ -195,26 +193,23 @@ public class PutHDFS extends AbstractHadoopProcessor {
             return;
         }
 
-        final Tuple<Configuration, FileSystem> resources = hdfsResources.get();
-        if (resources == null || resources.getKey() == null || 
resources.getValue() == null) {
+        final Configuration configuration = getConfiguration();
+        final FileSystem hdfs = getFileSystem();
+        if (configuration == null || hdfs == null) {
             getLogger().error("HDFS not configured properly");
             session.transfer(flowFile, REL_FAILURE);
             context.yield();
             return;
         }
-        final Configuration conf = resources.getKey();
-        final FileSystem hdfs = resources.getValue();
 
-        final Path configuredRootDirPath = new 
Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile)
-                .getValue());
+        final Path configuredRootDirPath = new 
Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue());
         final String conflictResponse = 
context.getProperty(CONFLICT_RESOLUTION).getValue();
 
         final Double blockSizeProp = 
context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B);
         final long blockSize = blockSizeProp != null ? 
blockSizeProp.longValue() : hdfs.getDefaultBlockSize(configuredRootDirPath);
 
         final Double bufferSizeProp = 
context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
-        final int bufferSize = bufferSizeProp != null ? 
bufferSizeProp.intValue() : conf.getInt(BUFFER_SIZE_KEY,
-                BUFFER_SIZE_DEFAULT);
+        final int bufferSize = bufferSizeProp != null ? 
bufferSizeProp.intValue() : configuration.getInt(BUFFER_SIZE_KEY, 
BUFFER_SIZE_DEFAULT);
 
         final Integer replicationProp = 
context.getProperty(REPLICATION_FACTOR).asInteger();
         final short replication = replicationProp != null ? 
replicationProp.shortValue() : hdfs
@@ -230,7 +225,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
 
             // Create destination directory if it does not exist
             try {
-                if (!hdfs.getFileStatus(configuredRootDirPath).isDir()) {
+                if (!hdfs.getFileStatus(configuredRootDirPath).isDirectory()) {
                     throw new IOException(configuredRootDirPath.toString() + " 
already exists and is not a directory");
                 }
             } catch (FileNotFoundException fe) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java
 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java
new file mode 100644
index 0000000..49957f5
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.xml.bind.annotation.XmlTransient;
+import javax.xml.bind.annotation.XmlType;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A simple POJO for maintaining state about the last HDFS Listing that was 
performed so that
+ * we can avoid pulling the same file multiple times
+ */
+@XmlType(name = "listing")
+public class HDFSListing {
+    private Date latestTimestamp;
+    private Collection<String> matchingPaths;
+
+    /**
+     * @return the modification date of the newest file that was contained in 
the HDFS Listing
+     */
+    public Date getLatestTimestamp() {
+        return latestTimestamp;
+    }
+
+    /**
+     * Sets the timestamp of the modification date of the newest file that was 
contained in the HDFS Listing
+     *
+     * @param latestTimestamp the timestamp of the modification date of the 
newest file that was contained in the HDFS Listing
+     */
+    public void setLatestTimestamp(Date latestTimestamp) {
+        this.latestTimestamp = latestTimestamp;
+    }
+
+    /**
+     * @return a Collection containing the paths of all files in the HDFS 
Listing whose Modification date
+     * was equal to {@link #getLatestTimestamp()}
+     */
+    @XmlTransient
+    public Collection<String> getMatchingPaths() {
+        return matchingPaths;
+    }
+
+    /**
+     * @return a Collection of {@link Path} objects equivalent to those 
returned by {@link #getMatchingPaths()}
+     */
+    public Set<Path> toPaths() {
+        final Set<Path> paths = new HashSet<>(matchingPaths.size());
+        for ( final String pathname : matchingPaths ) {
+            paths.add(new Path(pathname));
+        }
+        return paths;
+    }
+
+    /**
+     * Sets the Collection containing the paths of all files in the HDFS 
Listing whose Modification Date was
+     * equal to {@link #getLatestTimestamp()}
+     * @param matchingPaths the paths that have last modified date matching 
the latest timestamp
+     */
+    public void setMatchingPaths(Collection<String> matchingPaths) {
+        this.matchingPaths = matchingPaths;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java
 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java
new file mode 100644
index 0000000..229f26c
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import 
org.apache.nifi.distributed.cache.client.exception.DeserializationException;
+import 
org.apache.nifi.distributed.cache.client.exception.SerializationException;
+
+public class LongSerDe implements Serializer<Long>, Deserializer<Long> {
+
+    @Override
+    public Long deserialize(final byte[] input) throws 
DeserializationException, IOException {
+        if ( input == null || input.length == 0 ) {
+            return null;
+        }
+
+        final DataInputStream dis = new DataInputStream(new 
ByteArrayInputStream(input));
+        return dis.readLong();
+    }
+
+    @Override
+    public void serialize(final Long value, final OutputStream out) throws 
SerializationException, IOException {
+        final DataOutputStream dos = new DataOutputStream(out);
+        dos.writeLong(value);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java
 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java
new file mode 100644
index 0000000..ca1c548
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import 
org.apache.nifi.distributed.cache.client.exception.DeserializationException;
+import 
org.apache.nifi.distributed.cache.client.exception.SerializationException;
+
+public class StringSerDe implements Serializer<String>, Deserializer<String> {
+
+    @Override
+    public String deserialize(final byte[] value) throws 
DeserializationException, IOException {
+        if ( value == null ) {
+            return null;
+        }
+
+        return new String(value, StandardCharsets.UTF_8);
+    }
+
+    @Override
+    public void serialize(final String value, final OutputStream out) throws 
SerializationException, IOException {
+        out.write(value.getBytes(StandardCharsets.UTF_8));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index da16ef7..4b359e8 100644
--- 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -12,7 +12,9 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-org.apache.nifi.processors.hadoop.GetHDFS
-org.apache.nifi.processors.hadoop.PutHDFS
 org.apache.nifi.processors.hadoop.CreateHadoopSequenceFile
+org.apache.nifi.processors.hadoop.FetchHDFS
+org.apache.nifi.processors.hadoop.GetHDFS
 org.apache.nifi.processors.hadoop.GetHDFSSequenceFile
+org.apache.nifi.processors.hadoop.ListHDFS
+org.apache.nifi.processors.hadoop.PutHDFS

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
new file mode 100644
index 0000000..5822fc5
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestListHDFS {
+
+    private TestRunner runner;
+    private ListHDFSWithMockedFileSystem proc;
+    private MockCacheClient service;
+
+    @Before
+    public void setup() throws InitializationException {
+        proc = new ListHDFSWithMockedFileSystem();
+        runner = TestRunners.newTestRunner(proc);
+
+        service = new MockCacheClient();
+        runner.addControllerService("service", service);
+        runner.enableControllerService(service);
+
+        runner.setProperty(ListHDFS.HADOOP_CONFIGURATION_RESOURCES, 
"src/test/resources/core-site.xml");
+        runner.setProperty(ListHDFS.DIRECTORY, "/test");
+        runner.setProperty(ListHDFS.DISTRIBUTED_CACHE_SERVICE, "service");
+    }
+
+    @Test
+    public void testListingHasCorrectAttributes() {
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testFile.txt")));
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
+        final MockFlowFile mff = 
runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0);
+        mff.assertAttributeEquals("path", "/test");
+        mff.assertAttributeEquals("filename", "testFile.txt");
+    }
+
+
+    @Test
+    public void testRecursive() {
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testFile.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
+        proc.fileSystem.addFileStatus(new Path("/test/testDir"), new 
FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testDir/1.txt")));
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
+
+        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS);
+        for (int i=0; i < 2; i++) {
+            final MockFlowFile ff = flowFiles.get(i);
+            final String filename = ff.getAttribute("filename");
+
+            if (filename.equals("testFile.txt")) {
+                ff.assertAttributeEquals("path", "/test");
+            } else if ( filename.equals("1.txt")) {
+                ff.assertAttributeEquals("path", "/test/testDir");
+            } else {
+                Assert.fail("filename was " + filename);
+            }
+        }
+    }
+
+    @Test
+    public void testNotRecursive() {
+        runner.setProperty(ListHDFS.RECURSE_SUBDIRS, "false");
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testFile.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
+        proc.fileSystem.addFileStatus(new Path("/test/testDir"), new 
FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testDir/1.txt")));
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
+
+        final MockFlowFile mff1 = 
runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0);
+        mff1.assertAttributeEquals("path", "/test");
+        mff1.assertAttributeEquals("filename", "testFile.txt");
+    }
+
+
+    @Test
+    public void testNoListUntilUpdateFromRemoteOnPrimaryNodeChange() {
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testFile.txt")));
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
+
+        final MockFlowFile mff1 = 
runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0);
+        mff1.assertAttributeEquals("path", "/test");
+        mff1.assertAttributeEquals("filename", "testFile.txt");
+
+        runner.clearTransferState();
+
+        // add new file to pull
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testFile2.txt")));
+
+        // trigger primary node change
+        proc.onPrimaryNodeChange(PrimaryNodeState.ELECTED_PRIMARY_NODE);
+
+        // cause calls to service to fail
+        service.failOnCalls = true;
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
+
+        final String key = proc.getKey("/test");
+
+        // wait just to a bit to ensure that the timestamp changes when we 
update the service
+        final Object curVal = service.values.get(key);
+        try {
+            Thread.sleep(10L);
+        } catch (final InterruptedException ie) {
+        }
+
+        service.failOnCalls = false;
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
+
+        // ensure state saved both locally & remotely
+        assertTrue(proc.localStateSaved);
+        assertNotNull(service.values.get(key));
+        assertNotSame(curVal, service.values.get(key));
+    }
+
+
+    private FsPermission create777() {
+        return new FsPermission((short) 0777);
+    }
+
+
+    private class ListHDFSWithMockedFileSystem extends ListHDFS {
+        private final MockFileSystem fileSystem = new MockFileSystem();
+        private boolean localStateSaved = false;
+
+        @Override
+        protected FileSystem getFileSystem() {
+            return fileSystem;
+        }
+
+        @Override
+        protected File getPersistenceFile() {
+            return new File("target/conf/state-file");
+        }
+
+        @Override
+        protected FileSystem getFileSystem(final Configuration config) throws 
IOException {
+            return fileSystem;
+        }
+
+        @Override
+        protected void persistLocalState(final String directory, final String 
serializedState) throws IOException {
+            super.persistLocalState(directory, serializedState);
+            localStateSaved = true;
+        }
+    }
+
+
+    private class MockFileSystem extends FileSystem {
+        private final Map<Path, Set<FileStatus>> fileStatuses = new 
HashMap<>();
+
+        public void addFileStatus(final Path parent, final FileStatus child) {
+            Set<FileStatus> children = fileStatuses.get(parent);
+            if ( children == null ) {
+                children = new HashSet<>();
+                fileStatuses.put(parent, children);
+            }
+
+            children.add(child);
+        }
+
+
+        @Override
+        public long getDefaultBlockSize() {
+            return 1024L;
+        }
+
+        @Override
+        public short getDefaultReplication() {
+            return 1;
+        }
+
+        @Override
+        public URI getUri() {
+            return null;
+        }
+
+        @Override
+        public FSDataInputStream open(final Path f, final int bufferSize) 
throws IOException {
+            return null;
+        }
+
+        @Override
+        public FSDataOutputStream create(final Path f, final FsPermission 
permission, final boolean overwrite, final int bufferSize, final short 
replication,
+                final long blockSize, final Progressable progress) throws 
IOException {
+            return null;
+        }
+
+        @Override
+        public FSDataOutputStream append(final Path f, final int bufferSize, 
final Progressable progress) throws IOException {
+            return null;
+        }
+
+        @Override
+        public boolean rename(final Path src, final Path dst) throws 
IOException {
+            return false;
+        }
+
+        @Override
+        public boolean delete(final Path f, final boolean recursive) throws 
IOException {
+            return false;
+        }
+
+        @Override
+        public FileStatus[] listStatus(final Path f) throws 
FileNotFoundException, IOException {
+            final Set<FileStatus> statuses = fileStatuses.get(f);
+            if ( statuses == null ) {
+                return new FileStatus[0];
+            }
+
+            return statuses.toArray(new FileStatus[statuses.size()]);
+        }
+
+        @Override
+        public void setWorkingDirectory(final Path new_dir) {
+
+        }
+
+        @Override
+        public Path getWorkingDirectory() {
+            return new Path(new File(".").getAbsolutePath());
+        }
+
+        @Override
+        public boolean mkdirs(final Path f, final FsPermission permission) 
throws IOException {
+            return false;
+        }
+
+        @Override
+        public FileStatus getFileStatus(final Path f) throws IOException {
+            return null;
+        }
+
+    }
+
+
+    private class MockCacheClient extends AbstractControllerService implements 
DistributedMapCacheClient {
+        private final ConcurrentMap<Object, Object> values = new 
ConcurrentHashMap<>();
+        private boolean failOnCalls = false;
+
+        private void verifyNotFail() throws IOException {
+            if ( failOnCalls ) {
+                throw new IOException("Could not call to remote service 
because Unit Test marked service unavailable");
+            }
+        }
+
+        @Override
+        public <K, V> boolean putIfAbsent(final K key, final V value, final 
Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws 
IOException {
+            verifyNotFail();
+            final Object retValue = values.putIfAbsent(key, value);
+            return (retValue == null);
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public <K, V> V getAndPutIfAbsent(final K key, final V value, final 
Serializer<K> keySerializer, final Serializer<V> valueSerializer,
+                final Deserializer<V> valueDeserializer) throws IOException {
+            verifyNotFail();
+            return (V) values.putIfAbsent(key, value);
+        }
+
+        @Override
+        public <K> boolean containsKey(final K key, final Serializer<K> 
keySerializer) throws IOException {
+            verifyNotFail();
+            return values.containsKey(key);
+        }
+
+        @Override
+        public <K, V> void put(final K key, final V value, final Serializer<K> 
keySerializer, final Serializer<V> valueSerializer) throws IOException {
+            verifyNotFail();
+            values.put(key, value);
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public <K, V> V get(final K key, final Serializer<K> keySerializer, 
final Deserializer<V> valueDeserializer) throws IOException {
+            verifyNotFail();
+            return (V) values.get(key);
+        }
+
+        @Override
+        public void close() throws IOException {
+        }
+
+        @Override
+        public <K> boolean remove(final K key, final Serializer<K> serializer) 
throws IOException {
+            verifyNotFail();
+            values.remove(key);
+            return true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f5b6225/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
index 3beab65..9ea793d 100644
--- 
a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
+++ 
b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
@@ -121,13 +121,13 @@ public class IndexConfiguration {
         }
     }
 
-    public File getWritableIndexDirectory(final File provenanceLogFile) {
+    public File getWritableIndexDirectory(final File provenanceLogFile, final 
long newIndexTimestamp) {
         lock.lock();
         try {
             final File storageDirectory = provenanceLogFile.getParentFile();
             List<File> indexDirectories = 
this.indexDirectoryMap.get(storageDirectory);
             if (indexDirectories == null) {
-                final File newDir = addNewIndex(storageDirectory, 
provenanceLogFile);
+                final File newDir = addNewIndex(storageDirectory, 
provenanceLogFile, newIndexTimestamp);
                 indexDirectories = new ArrayList<>();
                 indexDirectories.add(newDir);
                 indexDirectoryMap.put(storageDirectory, indexDirectories);
@@ -135,7 +135,7 @@ public class IndexConfiguration {
             }
 
             if (indexDirectories.isEmpty()) {
-                final File newDir = addNewIndex(storageDirectory, 
provenanceLogFile);
+                final File newDir = addNewIndex(storageDirectory, 
provenanceLogFile, newIndexTimestamp);
                 indexDirectories.add(newDir);
                 return newDir;
             }
@@ -143,7 +143,7 @@ public class IndexConfiguration {
             final File lastDir = indexDirectories.get(indexDirectories.size() 
- 1);
             final long size = getSize(lastDir);
             if (size > repoConfig.getDesiredIndexSize()) {
-                final File newDir = addNewIndex(storageDirectory, 
provenanceLogFile);
+                final File newDir = addNewIndex(storageDirectory, 
provenanceLogFile, newIndexTimestamp);
                 indexDirectories.add(newDir);
                 return newDir;
             } else {
@@ -154,14 +154,14 @@ public class IndexConfiguration {
         }
     }
 
-    private File addNewIndex(final File storageDirectory, final File 
provenanceLogFile) {
+    private File addNewIndex(final File storageDirectory, final File 
provenanceLogFile, final long newIndexTimestamp) {
         // Build the event time of the first record into the index's filename 
so that we can determine
         // which index files to look at when we perform a search. We use the 
timestamp of the first record
         // in the Provenance Log file, rather than the current time, because 
we may perform the Indexing
         // retroactively.
         Long firstEntryTime = getFirstEntryTime(provenanceLogFile);
         if (firstEntryTime == null) {
-            firstEntryTime = System.currentTimeMillis();
+            firstEntryTime = newIndexTimestamp;
         }
         return new File(storageDirectory, "index-" + firstEntryTime);
     }
@@ -222,7 +222,7 @@ public class IndexConfiguration {
                 }
             });
 
-            for (File indexDir : sortedIndexDirectories) {
+            for (final File indexDir : sortedIndexDirectories) {
                 // If the index was last modified before the start time, we 
know that it doesn't
                 // contain any data for us to query.
                 if (startTime != null && indexDir.lastModified() < startTime) {
@@ -282,7 +282,7 @@ public class IndexConfiguration {
             }
 
             boolean foundIndexCreatedLater = false;
-            for (File indexDir : sortedIndexDirectories) {
+            for (final File indexDir : sortedIndexDirectories) {
                 // If the index was last modified before the log file was 
created, we know the index doesn't include
                 // any data for the provenance log.
                 if (indexDir.lastModified() < firstEntryTime) {

Reply via email to