http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
index 1dd5b91,361f1ed..f7894d9
--- 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
+++ 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
@@@ -58,13 -58,18 +58,15 @@@ import org.apache.nifi.processor.except
  import org.apache.nifi.processor.util.StandardValidators;
  import org.apache.nifi.util.StopWatch;
  
 -/**
 - * This processor reads files from HDFS into NiFi FlowFiles.
 - */
  @TriggerWhenEmpty
  @Tags({"hadoop", "HDFS", "get", "fetch", "ingest", "source", "filesystem"})
 -@CapabilityDescription("Fetch files from Hadoop Distributed File System 
(HDFS) into FlowFiles")
 +@CapabilityDescription("Fetch files from Hadoop Distributed File System 
(HDFS) into FlowFiles. This Processor will delete the file from HDFS after 
fetching it.")
  @WritesAttributes({
-         @WritesAttribute(attribute = "filename", description = "The name of 
the file that was read from HDFS."),
-         @WritesAttribute(attribute = "path", description = "The path is set 
to the relative path of the file's directory on HDFS. For example, if the 
Directory property is set to /tmp, then files picked up from /tmp will have the 
path attribute set to \"./\". If the Recurse Subdirectories property is set to 
true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will 
be set to \"abc/1/2/3\".") })
+     @WritesAttribute(attribute = "filename", description = "The name of the 
file that was read from HDFS."),
 -    @WritesAttribute(attribute = "path", description = "The path is set to 
the relative path of the file's directory on HDFS. For example, if "
 -            + "the Directory property is set to /tmp, then files picked up 
from /tmp will have the path attribute set to \"./\". If the Recurse "
 -            + "Subdirectories property is set to true and a file is picked up 
from /tmp/abc/1/2/3, then the path attribute will be set to \"abc/1/2/3\".")})
 -@SeeAlso(PutHDFS.class)
++    @WritesAttribute(attribute = "path", description = "The path is set to 
the relative path of the file's directory on HDFS. For example, if the 
Directory property "
++            + "is set to /tmp, then files picked up from /tmp will have the 
path attribute set to \"./\". If the Recurse Subdirectories property is set to 
true and "
++            + "a file is picked up from /tmp/abc/1/2/3, then the path 
attribute will be set to \"abc/1/2/3\".") })
 +@SeeAlso({PutHDFS.class, ListHDFS.class})
  public class GetHDFS extends AbstractHadoopProcessor {
  
      public static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
@@@ -73,105 -78,101 +75,101 @@@
  
      // relationships
      public static final Relationship REL_SUCCESS = new Relationship.Builder()
--            .name("success")
--            .description("All files retrieved from HDFS are transferred to 
this relationship")
--            .build();
++    .name("success")
++    .description("All files retrieved from HDFS are transferred to this 
relationship")
++    .build();
  
      public static final Relationship REL_PASSTHROUGH = new 
Relationship.Builder()
--            .name("passthrough")
--            .description(
--                    "If this processor has an input queue for some reason, 
then FlowFiles arriving on that input are transferred to this relationship")
++    .name("passthrough")
++    .description(
++            "If this processor has an input queue for some reason, then 
FlowFiles arriving on that input are transferred to this relationship")
              .build();
  
      // properties
      public static final PropertyDescriptor DIRECTORY = new 
PropertyDescriptor.Builder()
--            .name(DIRECTORY_PROP_NAME)
--            .description("The HDFS directory from which files should be read")
--            .required(true)
--            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
--            .build();
++    .name(DIRECTORY_PROP_NAME)
++    .description("The HDFS directory from which files should be read")
++    .required(true)
++    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
++    .build();
  
      public static final PropertyDescriptor RECURSE_SUBDIRS = new 
PropertyDescriptor.Builder()
--            .name("Recurse Subdirectories")
--            .description("Indicates whether to pull files from subdirectories 
of the HDFS directory")
--            .required(true)
--            .allowableValues("true", "false")
--            .defaultValue("true")
--            .build();
++    .name("Recurse Subdirectories")
++    .description("Indicates whether to pull files from subdirectories of the 
HDFS directory")
++    .required(true)
++    .allowableValues("true", "false")
++    .defaultValue("true")
++    .build();
  
      public static final PropertyDescriptor KEEP_SOURCE_FILE = new 
PropertyDescriptor.Builder()
--            .name("Keep Source File")
-             .description("Determines whether to delete the file from HDFS 
after it has been successfully transferred. If true, the file will be fetched 
repeatedly. This is intended for testing only.")
 -            .description("Determines whether to delete the file from HDFS 
after it has been successfully transferred")
--            .required(true)
--            .allowableValues("true", "false")
--            .defaultValue("false")
--            .build();
++    .name("Keep Source File")
++    .description("Determines whether to delete the file from HDFS after it 
has been successfully transferred. If true, the file will be fetched 
repeatedly. This is intended for testing only.")
++    .required(true)
++    .allowableValues("true", "false")
++    .defaultValue("false")
++    .build();
  
      public static final PropertyDescriptor FILE_FILTER_REGEX = new 
PropertyDescriptor.Builder()
--            .name("File Filter Regex")
-             .description(
-                     "A Java Regular Expression for filtering Filenames; if a 
filter is supplied then only files whose names match that Regular Expression 
will be fetched, otherwise all files will be fetched")
 -            .description("A Java Regular Expression for filtering Filenames; 
if a filter is supplied then only files whose names match that Regular "
 -                    + "Expression will be fetched, otherwise all files will 
be fetched")
++    .name("File Filter Regex")
++    .description("A Java Regular Expression for filtering Filenames; if a 
filter is supplied then only files whose names match that Regular "
++            + "Expression will be fetched, otherwise all files will be 
fetched")
              .required(false)
              .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
              .build();
  
      public static final PropertyDescriptor FILTER_MATCH_NAME_ONLY = new 
PropertyDescriptor.Builder()
--            .name("Filter Match Name Only")
-             .description(
-                     "If true then File Filter Regex will match on just the 
filename, otherwise subdirectory names will be included with filename in the 
regex comparison")
 -            .description("If true then File Filter Regex will match on just 
the filename, otherwise subdirectory names will be included with filename "
 -                    + "in the regex comparison")
++    .name("Filter Match Name Only")
++    .description("If true then File Filter Regex will match on just the 
filename, otherwise subdirectory names will be included with filename "
++            + "in the regex comparison")
              .required(true)
              .allowableValues("true", "false")
              .defaultValue("true")
              .build();
  
      public static final PropertyDescriptor IGNORE_DOTTED_FILES = new 
PropertyDescriptor.Builder()
--            .name("Ignore Dotted Files")
--            .description("If true, files whose names begin with a dot (\".\") 
will be ignored")
--            .required(true)
--            .allowableValues("true", "false")
--            .defaultValue("true")
--            .build();
++    .name("Ignore Dotted Files")
++    .description("If true, files whose names begin with a dot (\".\") will be 
ignored")
++    .required(true)
++    .allowableValues("true", "false")
++    .defaultValue("true")
++    .build();
  
      public static final PropertyDescriptor MIN_AGE = new 
PropertyDescriptor.Builder()
--            .name("Minimum File Age")
-             .description(
-                     "The minimum age that a file must be in order to be 
pulled; any file younger than this amount of time (based on last modification 
date) will be ignored")
 -            .description("The minimum age that a file must be in order to be 
pulled; any file younger than this amount of time (based on last modification 
date) will be ignored")
--            .required(true)
-             .addValidator(
-                     StandardValidators.createTimePeriodValidator(0, 
TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
 -            .addValidator(StandardValidators.createTimePeriodValidator(0, 
TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
--            .defaultValue("0 sec")
--            .build();
++    .name("Minimum File Age")
++    .description("The minimum age that a file must be in order to be pulled; 
any file younger than this amount of time (based on last modification date) 
will be ignored")
++    .required(true)
++    .addValidator(StandardValidators.createTimePeriodValidator(0, 
TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
++    .defaultValue("0 sec")
++    .build();
  
      public static final PropertyDescriptor MAX_AGE = new 
PropertyDescriptor.Builder()
--            .name("Maximum File Age")
-             .description(
-                     "The maximum age that a file must be in order to be 
pulled; any file older than this amount of time (based on last modification 
date) will be ignored")
 -            .description("The maximum age that a file must be in order to be 
pulled; any file older than this amount of time (based on last modification 
date) will be ignored")
--            .required(false)
-             .addValidator(
-                     StandardValidators.createTimePeriodValidator(100, 
TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
 -            .addValidator(StandardValidators.createTimePeriodValidator(100, 
TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
--            .build();
++    .name("Maximum File Age")
++    .description("The maximum age that a file must be in order to be pulled; 
any file older than this amount of time (based on last modification date) will 
be ignored")
++    .required(false)
++    .addValidator(StandardValidators.createTimePeriodValidator(100, 
TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
++    .build();
  
      public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
--            .name("Batch Size")
--            .description("The maximum number of files to pull in each 
iteration, based on run schedule.")
--            .required(true)
--            .defaultValue("100")
--            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
--            .build();
++    .name("Batch Size")
++    .description("The maximum number of files to pull in each iteration, 
based on run schedule.")
++    .required(true)
++    .defaultValue("100")
++    .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
++    .build();
  
      public static final PropertyDescriptor POLLING_INTERVAL = new 
PropertyDescriptor.Builder()
--            .name("Polling Interval")
--            .description("Indicates how long to wait between performing 
directory listings")
--            .required(true)
--            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
--            .defaultValue("0 sec")
--            .build();
++    .name("Polling Interval")
++    .description("Indicates how long to wait between performing directory 
listings")
++    .required(true)
++    .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
++    .defaultValue("0 sec")
++    .build();
  
      public static final PropertyDescriptor BUFFER_SIZE = new 
PropertyDescriptor.Builder()
--            .name("IO Buffer Size")
--            .description("Amount of memory to use to buffer file contents 
during IO. This overrides the Hadoop Configuration")
--            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
--            .build();
++    .name("IO Buffer Size")
++    .description("Amount of memory to use to buffer file contents during IO. 
This overrides the Hadoop Configuration")
++    .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
++    .build();
  
      private static final Set<Relationship> relationships;
      protected static final List<PropertyDescriptor> localProperties;
@@@ -461,11 -463,35 +460,8 @@@
          return files;
      }
  
-     
- 
      /**
-      * Holder for a snapshot in time of some processor properties that are
-      * passed around.
 -     * Returns the relative path of the child that does not include the 
filename or the root path.
 -     *
 -     * @param root root
 -     * @param child child
 -     * @return the relative path of the child that does not include the 
filename or the root path
 -     */
 -    public static String getPathDifference(final Path root, final Path child) 
{
 -        final int depthDiff = child.depth() - root.depth();
 -        if (depthDiff <= 1) {
 -            return "".intern();
 -        }
 -        String lastRoot = root.getName();
 -        Path childsParent = child.getParent();
 -        final StringBuilder builder = new StringBuilder();
 -        builder.append(childsParent.getName());
 -        for (int i = (depthDiff - 3); i >= 0; i--) {
 -            childsParent = childsParent.getParent();
 -            String name = childsParent.getName();
 -            if (name.equals(lastRoot) && 
childsParent.toString().endsWith(root.toString())) {
 -                break;
 -            }
 -            builder.insert(0, Path.SEPARATOR).insert(0, name);
 -        }
 -        return builder.toString();
 -    }
 -
 -    /**
+      * Holder for a snapshot in time of some processor properties that are 
passed around.
       */
      protected static class ProcessorConfiguration {
  

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index 707b50d,0000000..56a128a
mode 100644,000000..100644
--- 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@@ -1,466 -1,0 +1,469 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.nifi.processors.hadoop;
 +
 +import java.io.File;
 +import java.io.FileInputStream;
 +import java.io.FileOutputStream;
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.Comparator;
 +import java.util.Date;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Properties;
 +import java.util.Set;
 +
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.fs.permission.FsAction;
 +import org.apache.hadoop.fs.permission.FsPermission;
 +import org.apache.nifi.annotation.behavior.TriggerSerially;
 +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
 +import org.apache.nifi.annotation.behavior.WritesAttribute;
 +import org.apache.nifi.annotation.behavior.WritesAttributes;
 +import org.apache.nifi.annotation.documentation.CapabilityDescription;
 +import org.apache.nifi.annotation.documentation.SeeAlso;
 +import org.apache.nifi.annotation.documentation.Tags;
 +import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
 +import org.apache.nifi.annotation.notification.PrimaryNodeState;
 +import org.apache.nifi.components.PropertyDescriptor;
 +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
 +import org.apache.nifi.flowfile.FlowFile;
 +import org.apache.nifi.flowfile.attributes.CoreAttributes;
 +import org.apache.nifi.processor.ProcessContext;
 +import org.apache.nifi.processor.ProcessSession;
 +import org.apache.nifi.processor.ProcessorInitializationContext;
 +import org.apache.nifi.processor.Relationship;
 +import org.apache.nifi.processor.exception.ProcessException;
 +import org.apache.nifi.processor.util.StandardValidators;
 +import org.apache.nifi.processors.hadoop.util.HDFSListing;
 +import org.apache.nifi.processors.hadoop.util.StringSerDe;
 +import org.codehaus.jackson.JsonGenerationException;
 +import org.codehaus.jackson.JsonNode;
 +import org.codehaus.jackson.JsonParseException;
 +import org.codehaus.jackson.map.JsonMappingException;
 +import org.codehaus.jackson.map.ObjectMapper;
 +
 +
 +@TriggerSerially
 +@TriggerWhenEmpty
 +@Tags({"hadoop", "HDFS", "get", "list", "ingest", "source", "filesystem"})
 +@CapabilityDescription("Retrieves a listing of files from HDFS. For each file 
that is listed in HDFS, creates a FlowFile that represents "
-               + "the HDFS file so that it can be fetched in conjunction with 
ListHDFS. This Processor is designed to run on Primary Node only "
-               + "in a cluster. If the primary node changes, the new Primary 
Node will pick up where the previous node left off without duplicating "
-               + "all of the data. Unlike GetHDFS, this Processor does not 
delete any data from HDFS.")
++        + "the HDFS file so that it can be fetched in conjunction with 
ListHDFS. This Processor is designed to run on Primary Node only "
++        + "in a cluster. If the primary node changes, the new Primary Node 
will pick up where the previous node left off without duplicating "
++        + "all of the data. Unlike GetHDFS, this Processor does not delete 
any data from HDFS.")
 +@WritesAttributes({
-         @WritesAttribute(attribute="filename", description="The name of the 
file that was read from HDFS."),
-         @WritesAttribute(attribute="path", description="The path is set to 
the absolute path of the file's directory on HDFS. For example, if the 
Directory property is set to /tmp, then files picked up from /tmp will have the 
path attribute set to \"./\". If the Recurse Subdirectories property is set to 
true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will 
be set to \"/tmp/abc/1/2/3\"."),
-               @WritesAttribute(attribute="hdfs.owner", description="The user 
that owns the file in HDFS"),
-               @WritesAttribute(attribute="hdfs.group", description="The group 
that owns the file in HDFS"),
-               @WritesAttribute(attribute="hdfs.lastModified", 
description="The timestamp of when the file in HDFS was last modified, as 
milliseconds since midnight Jan 1, 1970 UTC"),
-               @WritesAttribute(attribute="hdfs.length", description="The 
number of bytes in the file in HDFS"),
-               @WritesAttribute(attribute="hdfs.replication", description="The 
number of HDFS replicas for hte file"),
-               @WritesAttribute(attribute="hdfs.permissions", description="The 
permissions for the file in HDFS. This is formatted as 3 characters for the 
owner, 3 for the group, and 3 for other users. For example rw-rw-r--")
++    @WritesAttribute(attribute="filename", description="The name of the file 
that was read from HDFS."),
++    @WritesAttribute(attribute="path", description="The path is set to the 
absolute path of the file's directory on HDFS. For example, if the Directory 
property is set to /tmp, "
++            + "then files picked up from /tmp will have the path attribute 
set to \"./\". If the Recurse Subdirectories property is set to true and a file 
is picked up "
++            + "from /tmp/abc/1/2/3, then the path attribute will be set to 
\"/tmp/abc/1/2/3\"."),
++    @WritesAttribute(attribute="hdfs.owner", description="The user that owns 
the file in HDFS"),
++    @WritesAttribute(attribute="hdfs.group", description="The group that owns 
the file in HDFS"),
++    @WritesAttribute(attribute="hdfs.lastModified", description="The 
timestamp of when the file in HDFS was last modified, as milliseconds since 
midnight Jan 1, 1970 UTC"),
++    @WritesAttribute(attribute="hdfs.length", description="The number of 
bytes in the file in HDFS"),
++    @WritesAttribute(attribute="hdfs.replication", description="The number of 
HDFS replicas for hte file"),
++    @WritesAttribute(attribute="hdfs.permissions", description="The 
permissions for the file in HDFS. This is formatted as 3 characters for the 
owner, "
++            + "3 for the group, and 3 for other users. For example rw-rw-r--")
 +})
 +@SeeAlso({GetHDFS.class, FetchHDFS.class, PutHDFS.class})
 +public class ListHDFS extends AbstractHadoopProcessor {
- 
-       public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new 
PropertyDescriptor.Builder()
-               .name("Distributed Cache Service")
-               .description("Specifies the Controller Service that should be 
used to maintain state about what has been pulled from HDFS so that if a new 
node begins pulling data, it won't duplicate all of the work that has been 
done.")
-               .required(true)
-               .identifiesControllerService(DistributedMapCacheClient.class)
-               .build();
++    public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new 
PropertyDescriptor.Builder()
++        .name("Distributed Cache Service")
++        .description("Specifies the Controller Service that should be used to 
maintain state about what has been pulled from HDFS so that if a new node "
++                + "begins pulling data, it won't duplicate all of the work 
that has been done.")
++        .required(true)
++        .identifiesControllerService(DistributedMapCacheClient.class)
++        .build();
 +
 +    public static final PropertyDescriptor DIRECTORY = new 
PropertyDescriptor.Builder()
-           .name(DIRECTORY_PROP_NAME)
-           .description("The HDFS directory from which files should be read")
-           .required(true)
-           .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-           .build();
-       
-       public static final PropertyDescriptor RECURSE_SUBDIRS = new 
PropertyDescriptor.Builder()
-           .name("Recurse Subdirectories")
-           .description("Indicates whether to list files from subdirectories 
of the HDFS directory")
-           .required(true)
-           .allowableValues("true", "false")
-           .defaultValue("true")
-           .build();
-       
-       
++        .name(DIRECTORY_PROP_NAME)
++        .description("The HDFS directory from which files should be read")
++        .required(true)
++        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
++        .build();
++
++    public static final PropertyDescriptor RECURSE_SUBDIRS = new 
PropertyDescriptor.Builder()
++        .name("Recurse Subdirectories")
++        .description("Indicates whether to list files from subdirectories of 
the HDFS directory")
++        .required(true)
++        .allowableValues("true", "false")
++        .defaultValue("true")
++        .build();
++
++
 +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
-           .name("success")
-           .description("All FlowFiles are transferred to this relationship")
-           .build();
++        .name("success")
++        .description("All FlowFiles are transferred to this relationship")
++        .build();
 +
 +    private volatile Long lastListingTime = null;
 +    private volatile Set<Path> latestPathsListed = new HashSet<>();
 +    private volatile boolean electedPrimaryNode = false;
 +    private File persistenceFile = null;
-     
++
 +    @Override
 +    protected void init(final ProcessorInitializationContext context) {
-       super.init(context);
-       persistenceFile = new File("conf/state/" + getIdentifier());
++        super.init(context);
++        persistenceFile = new File("conf/state/" + getIdentifier());
++    }
++
++    @Override
++    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
++        final List<PropertyDescriptor> properties = new ArrayList<>();
++        properties.add(HADOOP_CONFIGURATION_RESOURCES);
++        properties.add(DISTRIBUTED_CACHE_SERVICE);
++        properties.add(DIRECTORY);
++        properties.add(RECURSE_SUBDIRS);
++        return properties;
++    }
++
++    @Override
++    public Set<Relationship> getRelationships() {
++        final Set<Relationship> relationships = new HashSet<>();
++        relationships.add(REL_SUCCESS);
++        return relationships;
++    }
++
++    private String getKey(final String directory) {
++        return getIdentifier() + ".lastListingTime." + directory;
++    }
++
++    @OnPrimaryNodeStateChange
++    public void onPrimaryNodeChange(final PrimaryNodeState newState) {
++        if ( newState == PrimaryNodeState.ELECTED_PRIMARY_NODE ) {
++            electedPrimaryNode = true;
++        }
++    }
++
++    @Override
++    public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
++        if ( descriptor.equals(DIRECTORY) ) {
++            lastListingTime = null; // clear lastListingTime so that we have 
to fetch new time
++            latestPathsListed = new HashSet<>();
++        }
 +    }
-       
-       @Override
-       protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-               final List<PropertyDescriptor> properties = new ArrayList<>();
-               properties.add(HADOOP_CONFIGURATION_RESOURCES);
-               properties.add(DISTRIBUTED_CACHE_SERVICE);
-               properties.add(DIRECTORY);
-               properties.add(RECURSE_SUBDIRS);
-               return properties;
-       }
-       
-       @Override
-       public Set<Relationship> getRelationships() {
-               final Set<Relationship> relationships = new HashSet<>();
-               relationships.add(REL_SUCCESS);
-               return relationships;
-       }
- 
-       private String getKey(final String directory) {
-               return getIdentifier() + ".lastListingTime." + directory;
-       }
-       
-       @OnPrimaryNodeStateChange
-       public void onPrimaryNodeChange(final PrimaryNodeState newState) {
-               if ( newState == PrimaryNodeState.ELECTED_PRIMARY_NODE ) {
-                       electedPrimaryNode = true;
-               }
-       }
-       
-       @Override
-       public void onPropertyModified(final PropertyDescriptor descriptor, 
final String oldValue, final String newValue) {
-               if ( descriptor.equals(DIRECTORY) ) {
-                       lastListingTime = null; // clear lastListingTime so 
that we have to fetch new time
-                       latestPathsListed = new HashSet<>();
-               }
-       }
-       
-       private HDFSListing deserialize(final String serializedState) throws 
JsonParseException, JsonMappingException, IOException {
-               final ObjectMapper mapper = new ObjectMapper();
++
++    private HDFSListing deserialize(final String serializedState) throws 
JsonParseException, JsonMappingException, IOException {
++        final ObjectMapper mapper = new ObjectMapper();
 +        final JsonNode jsonNode = mapper.readTree(serializedState);
 +        return mapper.readValue(jsonNode, HDFSListing.class);
-       }
-       
-       
-       @Override
-       public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
-               final String directory = 
context.getProperty(DIRECTORY).getValue();
- 
-               // Determine the timestamp for the last file that we've listed.
-               Long minTimestamp = lastListingTime;
-               if ( minTimestamp == null || electedPrimaryNode ) {
-                       // We haven't yet restored any state from local or 
distributed state - or it's been at least a minute since
-                       // we have performed a listing. In this case, 
-                       // First, attempt to get timestamp from distributed 
cache service.
-                       final DistributedMapCacheClient client = 
context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
-                       
-                       try {
-                               final StringSerDe serde = new StringSerDe();
-                               final String serializedState = 
client.get(getKey(directory), serde, serde);
-                               if ( serializedState == null || 
serializedState.isEmpty() ) {
-                                       minTimestamp = null;
-                                       this.latestPathsListed = 
Collections.emptySet();
-                               } else {
-                                       final HDFSListing listing = 
deserialize(serializedState);
-                                       this.lastListingTime = 
listing.getLatestTimestamp().getTime();
-                                       minTimestamp = 
listing.getLatestTimestamp().getTime();
-                                       this.latestPathsListed = 
listing.toPaths();
-                               }
-                               
-                               this.lastListingTime = minTimestamp;
-                               electedPrimaryNode = false;     // no 
requirement to pull an update from the distributed cache anymore.
-                       } catch (final IOException ioe) {
-                               getLogger().error("Failed to retrieve timestamp 
of last listing from Distributed Cache Service. Will not perform listing until 
this is accomplished.");
-                               context.yield();
-                               return;
-                       }
-                       
-                       // Check the persistence file. We want to use the 
latest timestamp that we have so that
-                       // we don't duplicate data.
-                       try {
-                               if ( persistenceFile.exists() ) {
-                                       try (final FileInputStream fis = new 
FileInputStream(persistenceFile)) {
-                                               final Properties props = new 
Properties();
-                                               props.load(fis);
- 
-                                               // get the local timestamp for 
this directory, if it exists.
-                                               final String 
locallyPersistedValue = props.getProperty(directory);
-                                               if ( locallyPersistedValue != 
null ) {
-                                                       final HDFSListing 
listing = deserialize(locallyPersistedValue);
-                                                       final long 
localTimestamp = listing.getLatestTimestamp().getTime();
-                                                       
-                                                       // If distributed state 
doesn't have an entry or the local entry is later than the distributed state,
-                                                       // update the 
distributed state so that we are in sync.
-                                                       if (minTimestamp == 
null || localTimestamp > minTimestamp) {
-                                                               minTimestamp = 
localTimestamp;
-                                                               
-                                                               // Our local 
persistence file shows a later time than the Distributed service.
-                                                               // Update the 
distributed service to match our local state.
-                                                               try {
-                                                                       final 
StringSerDe serde = new StringSerDe();
-                                                                       
client.put(getKey(directory), locallyPersistedValue, serde, serde);
-                                                               } catch (final 
IOException ioe) {
-                                                                       
getLogger().warn("Local timestamp for {} is {}, which is later than Distributed 
state but failed to update Distributed "
-                                                                               
        + "state due to {}. If a new node performs HDFS Listing, data 
duplication may occur", 
-                                                                               
        new Object[] {directory, locallyPersistedValue, ioe});
-                                                               }
-                                                       }
-                                               }
-                                       }
-                               }
-                       } catch (final IOException ioe) {
-                               getLogger().warn("Failed to recover local state 
due to {}. Assuming that the state from the distributed cache is correct.", 
ioe);
-                       }
-               }
-               
-               
-               // Pull in any file that is newer than the timestamp that we 
have.
-               final FileSystem hdfs = hdfsResources.get().getValue();
-               final boolean recursive = 
context.getProperty(RECURSE_SUBDIRS).asBoolean();
-               final Path rootPath = new Path(directory);
-               
-               int listCount = 0;
-               Long latestListingModTime = null;
-               final Set<FileStatus> statuses;
-               try {
-                       statuses = getStatuses(rootPath, recursive, hdfs);
-                       for ( final FileStatus status : statuses ) {
-                               // don't get anything where the last modified 
timestamp is equal to our current timestamp.
-                               // if we do, then we run the risk of multiple 
files having the same last mod date but us only
-                               // seeing a portion of them.
-                               // I.e., there could be 5 files with last mod 
date = (now). But if we do the listing now, maybe
-                               // only 2 exist and 3 more will exist later in 
this millisecond. So we ignore anything with a
-                               // modified date not before the current time.
-                               final long fileModTime = 
status.getModificationTime();
-                               
-                               // we only want the file if its timestamp is 
later than the minTimestamp or equal to and we didn't pull it last time.
-                               // Also, HDFS creates files with the suffix 
_COPYING_ when they are being written - we want to ignore those.
-                               boolean fetch = 
!status.getPath().getName().endsWith("_COPYING_") &&
-                                               (minTimestamp == null || 
fileModTime > minTimestamp || (fileModTime == minTimestamp && 
!latestPathsListed.contains(status.getPath())));
-                               
-                               // Create the FlowFile for this path.
-                               if ( fetch ) {
-                                       final Map<String, String> attributes = 
createAttributes(status);
-                                       FlowFile flowFile = session.create();
-                                       flowFile = 
session.putAllAttributes(flowFile, attributes);
-                                       session.transfer(flowFile, REL_SUCCESS);
-                                       listCount++;
-                                       
-                                       if ( latestListingModTime == null || 
fileModTime > latestListingModTime ) {
-                                               latestListingModTime = 
fileModTime;
-                                       }
-                               }
-                       }
-               } catch (final IOException ioe) {
-                       getLogger().error("Failed to perform listing of HDFS 
due to {}", new Object[] {ioe});
-                       return;
-               }
-               
-               if ( listCount > 0 ) {
-                       getLogger().info("Successfully created listing with {} 
new files from HDFS", new Object[] {listCount});
-                       session.commit();
- 
-                       // We have performed a listing and pushed the FlowFiles 
out.
-                       // Now, we need to persist state about the Last 
Modified timestamp of the newest file
-                       // that we pulled in. We do this in order to avoid 
pulling in the same file twice.
-                       // However, we want to save the state both locally and 
remotely.
-                       // We store the state remotely so that if a new Primary 
Node is chosen, it can pick up where the
-                       // previously Primary Node left off.
-                       // We also store the state locally so that if the node 
is restarted, and the node cannot contact
-                       // the distributed state cache, the node can continue 
to run (if it is primary node).
-                       String serializedState = null;
-                       try {
-                               serializedState = 
serializeState(latestListingModTime, statuses);
-                       } catch (final Exception e) {
-                               getLogger().error("Failed to serialize state 
due to {}", new Object[] {e});
-                       }
-                       
-                       if ( serializedState != null ) {
-                               // Save our state locally.
-                               try {
-                                       persistLocalState(directory, 
serializedState);
-                               } catch (final IOException ioe) {
-                                       getLogger().warn("Unable to save state 
locally. If the node is restarted now, data may be duplicated. Failure is due 
to {}", ioe);
-                               }
-       
-                               // Attempt to save state to remote server.
-                               final DistributedMapCacheClient client = 
context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
-                               try {
-                                       client.put(getKey(directory), 
serializedState, new StringSerDe(), new StringSerDe());
-                               } catch (final IOException ioe) {
-                                       getLogger().warn("Unable to communicate 
with distributed cache server due to {}. Persisting state locally instead.", 
ioe);
-                               }
-                       }
-                       
-                       lastListingTime = latestListingModTime;
-               } else {
-                       getLogger().debug("There is no data to list. 
Yielding.");
-                       context.yield();
-                       
-                       // lastListingTime = 0 so that we don't continually 
poll the distributed cache / local file system
-                       if ( lastListingTime == null ) {
-                               lastListingTime = 0L;
-                       }
-                       
-                       return;
-               }
-       }
-       
-       private Set<FileStatus> getStatuses(final Path path, final boolean 
recursive, final FileSystem hdfs) throws IOException {
-               final Set<FileStatus> statusSet = new HashSet<>();
-               
-               final FileStatus[] statuses = hdfs.listStatus(path);
-               
-               for ( final FileStatus status : statuses ) {
-                       if ( status.isDirectory() ) {
-                               if ( recursive ) {
-                                       try {
-                                               
statusSet.addAll(getStatuses(status.getPath(), recursive, hdfs));
-                                       } catch (final IOException ioe) {
-                                               getLogger().error("Failed to 
retrieve HDFS listing for subdirectory {} due to {}; will continue listing 
others", new Object[] {status.getPath(), ioe});
-                                       }
-                               }
-                       } else {
-                               statusSet.add(status);
-                       }
-               }
-               
-               return statusSet;
-       }
-       
-       
-       private String serializeState(final long latestListingTime, final 
Set<FileStatus> statuses) throws JsonGenerationException, JsonMappingException, 
IOException {
-               // we need to keep track of all files that we pulled in that 
had a modification time equal to
-               // lastListingTime so that we can avoid pulling those files in 
again. We can't just ignore any files
-               // that have a mod time equal to that timestamp because more 
files may come in with the same timestamp
-               // later in the same millisecond.
-               if ( statuses.isEmpty() ) {
-                       return null;
-               } else {
-                       final List<FileStatus> sortedStatuses = new 
ArrayList<>(statuses);
-                       Collections.sort(sortedStatuses, new 
Comparator<FileStatus>() {
-                               @Override
-                               public int compare(final FileStatus o1, final 
FileStatus o2) {
-                                       return 
Long.compare(o1.getModificationTime(), o2.getModificationTime());
-                               }
-                       });
-                       
-                       final long latestListingModTime = 
sortedStatuses.get(sortedStatuses.size() - 1).getModificationTime();
-                       final Set<Path> pathsWithModTimeEqualToListingModTime = 
new HashSet<>();
-                       for (int i=sortedStatuses.size() - 1; i >= 0; i--) {
-                               final FileStatus status = sortedStatuses.get(i);
-                               if (status.getModificationTime() == 
latestListingModTime) {
-                                       
pathsWithModTimeEqualToListingModTime.add(status.getPath());
-                               }
-                       }
-                       
-                       this.latestPathsListed = 
pathsWithModTimeEqualToListingModTime;
-                       
-                       final HDFSListing listing = new HDFSListing();
-                       listing.setLatestTimestamp(new 
Date(latestListingModTime));
-                       final Set<String> paths = new HashSet<>();
-                       for ( final Path path : 
pathsWithModTimeEqualToListingModTime ) {
-                               paths.add(path.toUri().toString());
-                       }
-                       listing.setMatchingPaths(paths);
- 
-                       final ObjectMapper mapper = new ObjectMapper();
-                       final String serializedState = 
mapper.writerWithType(HDFSListing.class).writeValueAsString(listing);
-                       return serializedState;
-               }
-       }
-       
-       private void persistLocalState(final String directory, final String 
serializedState) throws IOException {
-               // we need to keep track of all files that we pulled in that 
had a modification time equal to
-               // lastListingTime so that we can avoid pulling those files in 
again. We can't just ignore any files
-               // that have a mod time equal to that timestamp because more 
files may come in with the same timestamp
-               // later in the same millisecond.
-               final File dir = persistenceFile.getParentFile();
-               if ( !dir.exists() && !dir.mkdirs() ) {
-                       throw new IOException("Could not create directory " + 
dir.getAbsolutePath() + " in order to save local state");
-               }
-               
-               final Properties props = new Properties();
-               if ( persistenceFile.exists() ) {
-                       try (final FileInputStream fis = new 
FileInputStream(persistenceFile)) {
-                               props.load(fis);
-                       }
-               }
- 
-               props.setProperty(directory, serializedState);
-               
-               try (final FileOutputStream fos = new 
FileOutputStream(persistenceFile)) {
-                       props.store(fos, null);
-               }
-       }
- 
-       private String getAbsolutePath(final Path path) {
-               final Path parent = path.getParent();
-               final String prefix = (parent == null || 
parent.getName().equals("")) ? "" : getAbsolutePath(parent);
-               return prefix + "/" + path.getName();
-       }
-       
-       private Map<String, String> createAttributes(final FileStatus status) {
-               final Map<String, String> attributes = new HashMap<>();
-               attributes.put(CoreAttributes.FILENAME.key(), 
status.getPath().getName());
-               attributes.put(CoreAttributes.PATH.key(), 
getAbsolutePath(status.getPath().getParent()));
-               
-               attributes.put("hdfs.owner", status.getOwner());
-               attributes.put("hdfs.group", status.getGroup());
-               attributes.put("hdfs.lastModified", 
String.valueOf(status.getModificationTime()));
-               attributes.put("hdfs.length", String.valueOf(status.getLen()));
-               attributes.put("hdfs.replication", 
String.valueOf(status.getReplication()));
-               
-               final FsPermission permission = status.getPermission();
-               final String perms = getPerms(permission.getUserAction()) + 
getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction());
-               attributes.put("hdfs.permissions", perms);
-               return attributes;
-       }
-       
-       private String getPerms(final FsAction action) {
-               final StringBuilder sb = new StringBuilder();
-               if ( action.implies(FsAction.READ) ) {
-                       sb.append("r");
-               } else {
-                       sb.append("-");
-               }
-               
-               if ( action.implies(FsAction.WRITE) ) {
-                       sb.append("w");
-               } else {
-                       sb.append("-");
-               }
-               
-               if ( action.implies(FsAction.EXECUTE) ) {
-                       sb.append("x");
-               } else {
-                       sb.append("-");
-               }
-               
-               return sb.toString();
-       }
++    }
++
++
++    @Override
++    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
++        final String directory = context.getProperty(DIRECTORY).getValue();
++
++        // Determine the timestamp for the last file that we've listed.
++        Long minTimestamp = lastListingTime;
++        if ( minTimestamp == null || electedPrimaryNode ) {
++            // We haven't yet restored any state from local or distributed 
state - or it's been at least a minute since
++            // we have performed a listing. In this case,
++            // First, attempt to get timestamp from distributed cache service.
++            final DistributedMapCacheClient client = 
context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
++
++            try {
++                final StringSerDe serde = new StringSerDe();
++                final String serializedState = client.get(getKey(directory), 
serde, serde);
++                if ( serializedState == null || serializedState.isEmpty() ) {
++                    minTimestamp = null;
++                    this.latestPathsListed = Collections.emptySet();
++                } else {
++                    final HDFSListing listing = deserialize(serializedState);
++                    this.lastListingTime = 
listing.getLatestTimestamp().getTime();
++                    minTimestamp = listing.getLatestTimestamp().getTime();
++                    this.latestPathsListed = listing.toPaths();
++                }
++
++                this.lastListingTime = minTimestamp;
++                electedPrimaryNode = false; // no requirement to pull an 
update from the distributed cache anymore.
++            } catch (final IOException ioe) {
++                getLogger().error("Failed to retrieve timestamp of last 
listing from Distributed Cache Service. Will not perform listing until this is 
accomplished.");
++                context.yield();
++                return;
++            }
++
++            // Check the persistence file. We want to use the latest 
timestamp that we have so that
++            // we don't duplicate data.
++            try {
++                if ( persistenceFile.exists() ) {
++                    try (final FileInputStream fis = new 
FileInputStream(persistenceFile)) {
++                        final Properties props = new Properties();
++                        props.load(fis);
++
++                        // get the local timestamp for this directory, if it 
exists.
++                        final String locallyPersistedValue = 
props.getProperty(directory);
++                        if ( locallyPersistedValue != null ) {
++                            final HDFSListing listing = 
deserialize(locallyPersistedValue);
++                            final long localTimestamp = 
listing.getLatestTimestamp().getTime();
++
++                            // If distributed state doesn't have an entry or 
the local entry is later than the distributed state,
++                            // update the distributed state so that we are in 
sync.
++                            if (minTimestamp == null || localTimestamp > 
minTimestamp) {
++                                minTimestamp = localTimestamp;
++
++                                // Our local persistence file shows a later 
time than the Distributed service.
++                                // Update the distributed service to match 
our local state.
++                                try {
++                                    final StringSerDe serde = new 
StringSerDe();
++                                    client.put(getKey(directory), 
locallyPersistedValue, serde, serde);
++                                } catch (final IOException ioe) {
++                                    getLogger().warn("Local timestamp for {} 
is {}, which is later than Distributed state but failed to update Distributed "
++                                            + "state due to {}. If a new node 
performs HDFS Listing, data duplication may occur",
++                                            new Object[] {directory, 
locallyPersistedValue, ioe});
++                                }
++                            }
++                        }
++                    }
++                }
++            } catch (final IOException ioe) {
++                getLogger().warn("Failed to recover local state due to {}. 
Assuming that the state from the distributed cache is correct.", ioe);
++            }
++        }
++
++
++        // Pull in any file that is newer than the timestamp that we have.
++        final FileSystem hdfs = hdfsResources.get().getValue();
++        final boolean recursive = 
context.getProperty(RECURSE_SUBDIRS).asBoolean();
++        final Path rootPath = new Path(directory);
++
++        int listCount = 0;
++        Long latestListingModTime = null;
++        final Set<FileStatus> statuses;
++        try {
++            statuses = getStatuses(rootPath, recursive, hdfs);
++            for ( final FileStatus status : statuses ) {
++                // don't get anything where the last modified timestamp is 
equal to our current timestamp.
++                // if we do, then we run the risk of multiple files having 
the same last mod date but us only
++                // seeing a portion of them.
++                // I.e., there could be 5 files with last mod date = (now). 
But if we do the listing now, maybe
++                // only 2 exist and 3 more will exist later in this 
millisecond. So we ignore anything with a
++                // modified date not before the current time.
++                final long fileModTime = status.getModificationTime();
++
++                // we only want the file if its timestamp is later than the 
minTimestamp or equal to and we didn't pull it last time.
++                // Also, HDFS creates files with the suffix _COPYING_ when 
they are being written - we want to ignore those.
++                boolean fetch = 
!status.getPath().getName().endsWith("_COPYING_")
++                        && (minTimestamp == null || fileModTime > 
minTimestamp || (fileModTime == minTimestamp && 
!latestPathsListed.contains(status.getPath())));
++
++                // Create the FlowFile for this path.
++                if ( fetch ) {
++                    final Map<String, String> attributes = 
createAttributes(status);
++                    FlowFile flowFile = session.create();
++                    flowFile = session.putAllAttributes(flowFile, attributes);
++                    session.transfer(flowFile, REL_SUCCESS);
++                    listCount++;
++
++                    if ( latestListingModTime == null || fileModTime > 
latestListingModTime ) {
++                        latestListingModTime = fileModTime;
++                    }
++                }
++            }
++        } catch (final IOException ioe) {
++            getLogger().error("Failed to perform listing of HDFS due to {}", 
new Object[] {ioe});
++            return;
++        }
++
++        if ( listCount > 0 ) {
++            getLogger().info("Successfully created listing with {} new files 
from HDFS", new Object[] {listCount});
++            session.commit();
++
++            // We have performed a listing and pushed the FlowFiles out.
++            // Now, we need to persist state about the Last Modified 
timestamp of the newest file
++            // that we pulled in. We do this in order to avoid pulling in the 
same file twice.
++            // However, we want to save the state both locally and remotely.
++            // We store the state remotely so that if a new Primary Node is 
chosen, it can pick up where the
++            // previously Primary Node left off.
++            // We also store the state locally so that if the node is 
restarted, and the node cannot contact
++            // the distributed state cache, the node can continue to run (if 
it is primary node).
++            String serializedState = null;
++            try {
++                serializedState = serializeState(latestListingModTime, 
statuses);
++            } catch (final Exception e) {
++                getLogger().error("Failed to serialize state due to {}", new 
Object[] {e});
++            }
++
++            if ( serializedState != null ) {
++                // Save our state locally.
++                try {
++                    persistLocalState(directory, serializedState);
++                } catch (final IOException ioe) {
++                    getLogger().warn("Unable to save state locally. If the 
node is restarted now, data may be duplicated. Failure is due to {}", ioe);
++                }
++
++                // Attempt to save state to remote server.
++                final DistributedMapCacheClient client = 
context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
++                try {
++                    client.put(getKey(directory), serializedState, new 
StringSerDe(), new StringSerDe());
++                } catch (final IOException ioe) {
++                    getLogger().warn("Unable to communicate with distributed 
cache server due to {}. Persisting state locally instead.", ioe);
++                }
++            }
++
++            lastListingTime = latestListingModTime;
++        } else {
++            getLogger().debug("There is no data to list. Yielding.");
++            context.yield();
++
++            // lastListingTime = 0 so that we don't continually poll the 
distributed cache / local file system
++            if ( lastListingTime == null ) {
++                lastListingTime = 0L;
++            }
++
++            return;
++        }
++    }
++
++    private Set<FileStatus> getStatuses(final Path path, final boolean 
recursive, final FileSystem hdfs) throws IOException {
++        final Set<FileStatus> statusSet = new HashSet<>();
++
++        final FileStatus[] statuses = hdfs.listStatus(path);
++
++        for ( final FileStatus status : statuses ) {
++            if ( status.isDirectory() ) {
++                if ( recursive ) {
++                    try {
++                        statusSet.addAll(getStatuses(status.getPath(), 
recursive, hdfs));
++                    } catch (final IOException ioe) {
++                        getLogger().error("Failed to retrieve HDFS listing 
for subdirectory {} due to {}; will continue listing others", new Object[] 
{status.getPath(), ioe});
++                    }
++                }
++            } else {
++                statusSet.add(status);
++            }
++        }
++
++        return statusSet;
++    }
++
++
++    private String serializeState(final long latestListingTime, final 
Set<FileStatus> statuses) throws JsonGenerationException, JsonMappingException, 
IOException {
++        // we need to keep track of all files that we pulled in that had a 
modification time equal to
++        // lastListingTime so that we can avoid pulling those files in again. 
We can't just ignore any files
++        // that have a mod time equal to that timestamp because more files 
may come in with the same timestamp
++        // later in the same millisecond.
++        if ( statuses.isEmpty() ) {
++            return null;
++        } else {
++            final List<FileStatus> sortedStatuses = new ArrayList<>(statuses);
++            Collections.sort(sortedStatuses, new Comparator<FileStatus>() {
++                @Override
++                public int compare(final FileStatus o1, final FileStatus o2) {
++                    return Long.compare(o1.getModificationTime(), 
o2.getModificationTime());
++                }
++            });
++
++            final long latestListingModTime = 
sortedStatuses.get(sortedStatuses.size() - 1).getModificationTime();
++            final Set<Path> pathsWithModTimeEqualToListingModTime = new 
HashSet<>();
++            for (int i=sortedStatuses.size() - 1; i >= 0; i--) {
++                final FileStatus status = sortedStatuses.get(i);
++                if (status.getModificationTime() == latestListingModTime) {
++                    
pathsWithModTimeEqualToListingModTime.add(status.getPath());
++                }
++            }
++
++            this.latestPathsListed = pathsWithModTimeEqualToListingModTime;
++
++            final HDFSListing listing = new HDFSListing();
++            listing.setLatestTimestamp(new Date(latestListingModTime));
++            final Set<String> paths = new HashSet<>();
++            for ( final Path path : pathsWithModTimeEqualToListingModTime ) {
++                paths.add(path.toUri().toString());
++            }
++            listing.setMatchingPaths(paths);
++
++            final ObjectMapper mapper = new ObjectMapper();
++            final String serializedState = 
mapper.writerWithType(HDFSListing.class).writeValueAsString(listing);
++            return serializedState;
++        }
++    }
++
++    private void persistLocalState(final String directory, final String 
serializedState) throws IOException {
++        // we need to keep track of all files that we pulled in that had a 
modification time equal to
++        // lastListingTime so that we can avoid pulling those files in again. 
We can't just ignore any files
++        // that have a mod time equal to that timestamp because more files 
may come in with the same timestamp
++        // later in the same millisecond.
++        final File dir = persistenceFile.getParentFile();
++        if ( !dir.exists() && !dir.mkdirs() ) {
++            throw new IOException("Could not create directory " + 
dir.getAbsolutePath() + " in order to save local state");
++        }
++
++        final Properties props = new Properties();
++        if ( persistenceFile.exists() ) {
++            try (final FileInputStream fis = new 
FileInputStream(persistenceFile)) {
++                props.load(fis);
++            }
++        }
++
++        props.setProperty(directory, serializedState);
++
++        try (final FileOutputStream fos = new 
FileOutputStream(persistenceFile)) {
++            props.store(fos, null);
++        }
++    }
++
++    private String getAbsolutePath(final Path path) {
++        final Path parent = path.getParent();
++        final String prefix = (parent == null || parent.getName().equals("")) 
? "" : getAbsolutePath(parent);
++        return prefix + "/" + path.getName();
++    }
++
++    private Map<String, String> createAttributes(final FileStatus status) {
++        final Map<String, String> attributes = new HashMap<>();
++        attributes.put(CoreAttributes.FILENAME.key(), 
status.getPath().getName());
++        attributes.put(CoreAttributes.PATH.key(), 
getAbsolutePath(status.getPath().getParent()));
++
++        attributes.put("hdfs.owner", status.getOwner());
++        attributes.put("hdfs.group", status.getGroup());
++        attributes.put("hdfs.lastModified", 
String.valueOf(status.getModificationTime()));
++        attributes.put("hdfs.length", String.valueOf(status.getLen()));
++        attributes.put("hdfs.replication", 
String.valueOf(status.getReplication()));
++
++        final FsPermission permission = status.getPermission();
++        final String perms = getPerms(permission.getUserAction()) + 
getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction());
++        attributes.put("hdfs.permissions", perms);
++        return attributes;
++    }
++
++    private String getPerms(final FsAction action) {
++        final StringBuilder sb = new StringBuilder();
++        if ( action.implies(FsAction.READ) ) {
++            sb.append("r");
++        } else {
++            sb.append("-");
++        }
++
++        if ( action.implies(FsAction.WRITE) ) {
++            sb.append("w");
++        } else {
++            sb.append("-");
++        }
++
++        if ( action.implies(FsAction.EXECUTE) ) {
++            sb.append("x");
++        } else {
++            sb.append("-");
++        }
++
++        return sb.toString();
++    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java
index 9f4d68b,0000000..49957f5
mode 100644,000000..100644
--- 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java
+++ 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java
@@@ -1,83 -1,0 +1,83 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.nifi.processors.hadoop.util;
 +
 +import java.util.Collection;
 +import java.util.Date;
 +import java.util.HashSet;
 +import java.util.Set;
 +
 +import javax.xml.bind.annotation.XmlTransient;
 +import javax.xml.bind.annotation.XmlType;
 +
 +import org.apache.hadoop.fs.Path;
 +
 +/**
 + * A simple POJO for maintaining state about the last HDFS Listing that was 
performed so that
-  * we can avoid pulling the same file multiple times 
++ * we can avoid pulling the same file multiple times
 + */
 +@XmlType(name = "listing")
 +public class HDFSListing {
-       private Date latestTimestamp;
-       private Collection<String> matchingPaths;
-       
-       /**
-        * @return the modification date of the newest file that was contained 
in the HDFS Listing
-        */
-       public Date getLatestTimestamp() {
-               return latestTimestamp;
-       }
-       
-       /**
-        * Sets the timestamp of the modification date of the newest file that 
was contained in the HDFS Listing
-        * 
-        * @param latestTimestamp the timestamp of the modification date of the 
newest file that was contained in the HDFS Listing
-        */
-       public void setLatestTimestamp(Date latestTimestamp) {
-               this.latestTimestamp = latestTimestamp;
-       }
-       
-       /**
-        * @return a Collection containing the paths of all files in the HDFS 
Listing whose Modification date
-        * was equal to {@link #getLatestTimestamp()}
-        */
-       @XmlTransient
-       public Collection<String> getMatchingPaths() {
-               return matchingPaths;
-       }
-       
-       /**
-        * @return a Collection of {@link Path} objects equivalent to those 
returned by {@link #getMatchingPaths()}
-        */
-       public Set<Path> toPaths() {
-               final Set<Path> paths = new HashSet<>(matchingPaths.size());
-               for ( final String pathname : matchingPaths ) {
-                       paths.add(new Path(pathname));
-               }
-               return paths;
-       }
++    private Date latestTimestamp;
++    private Collection<String> matchingPaths;
 +
-       /**
-        * Sets the Collection containing the paths of all files in the HDFS 
Listing whose Modification Date was
-        * equal to {@link #getLatestTimestamp()}
-        * @param matchingPaths
-        */
-       public void setMatchingPaths(Collection<String> matchingPaths) {
-               this.matchingPaths = matchingPaths;
-       }
++    /**
++     * @return the modification date of the newest file that was contained in 
the HDFS Listing
++     */
++    public Date getLatestTimestamp() {
++        return latestTimestamp;
++    }
++
++    /**
++     * Sets the timestamp of the modification date of the newest file that 
was contained in the HDFS Listing
++     *
++     * @param latestTimestamp the timestamp of the modification date of the 
newest file that was contained in the HDFS Listing
++     */
++    public void setLatestTimestamp(Date latestTimestamp) {
++        this.latestTimestamp = latestTimestamp;
++    }
++
++    /**
++     * @return a Collection containing the paths of all files in the HDFS 
Listing whose Modification date
++     * was equal to {@link #getLatestTimestamp()}
++     */
++    @XmlTransient
++    public Collection<String> getMatchingPaths() {
++        return matchingPaths;
++    }
++
++    /**
++     * @return a Collection of {@link Path} objects equivalent to those 
returned by {@link #getMatchingPaths()}
++     */
++    public Set<Path> toPaths() {
++        final Set<Path> paths = new HashSet<>(matchingPaths.size());
++        for ( final String pathname : matchingPaths ) {
++            paths.add(new Path(pathname));
++        }
++        return paths;
++    }
++
++    /**
++     * Sets the Collection containing the paths of all files in the HDFS 
Listing whose Modification Date was
++     * equal to {@link #getLatestTimestamp()}
++     * @param matchingPaths the paths that have last modified date matching 
the latest timestamp
++     */
++    public void setMatchingPaths(Collection<String> matchingPaths) {
++        this.matchingPaths = matchingPaths;
++    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java
index ef0e590,0000000..229f26c
mode 100644,000000..100644
--- 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java
+++ 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java
@@@ -1,48 -1,0 +1,48 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.nifi.processors.hadoop.util;
 +
 +import java.io.ByteArrayInputStream;
 +import java.io.DataInputStream;
 +import java.io.DataOutputStream;
 +import java.io.IOException;
 +import java.io.OutputStream;
 +
 +import org.apache.nifi.distributed.cache.client.Deserializer;
 +import org.apache.nifi.distributed.cache.client.Serializer;
 +import 
org.apache.nifi.distributed.cache.client.exception.DeserializationException;
 +import 
org.apache.nifi.distributed.cache.client.exception.SerializationException;
 +
 +public class LongSerDe implements Serializer<Long>, Deserializer<Long> {
 +
-       @Override
-       public Long deserialize(final byte[] input) throws 
DeserializationException, IOException {
-               if ( input == null || input.length == 0 ) {
-                       return null;
-               }
-               
-               final DataInputStream dis = new DataInputStream(new 
ByteArrayInputStream(input));
-               return dis.readLong();
-       }
++    @Override
++    public Long deserialize(final byte[] input) throws 
DeserializationException, IOException {
++        if ( input == null || input.length == 0 ) {
++            return null;
++        }
 +
-       @Override
-       public void serialize(final Long value, final OutputStream out) throws 
SerializationException, IOException {
-               final DataOutputStream dos = new DataOutputStream(out);
-               dos.writeLong(value);
-       }
++        final DataInputStream dis = new DataInputStream(new 
ByteArrayInputStream(input));
++        return dis.readLong();
++    }
++
++    @Override
++    public void serialize(final Long value, final OutputStream out) throws 
SerializationException, IOException {
++        final DataOutputStream dos = new DataOutputStream(out);
++        dos.writeLong(value);
++    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java
index 848831f,0000000..ca1c548
mode 100644,000000..100644
--- 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java
+++ 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java
@@@ -1,44 -1,0 +1,44 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.nifi.processors.hadoop.util;
 +
 +import java.io.IOException;
 +import java.io.OutputStream;
 +import java.nio.charset.StandardCharsets;
 +
 +import org.apache.nifi.distributed.cache.client.Deserializer;
 +import org.apache.nifi.distributed.cache.client.Serializer;
 +import 
org.apache.nifi.distributed.cache.client.exception.DeserializationException;
 +import 
org.apache.nifi.distributed.cache.client.exception.SerializationException;
 +
 +public class StringSerDe implements Serializer<String>, Deserializer<String> {
 +
-       @Override
-       public String deserialize(final byte[] value) throws 
DeserializationException, IOException {
-               if ( value == null ) {
-                       return null;
-               }
-               
-               return new String(value, StandardCharsets.UTF_8);
-       }
++    @Override
++    public String deserialize(final byte[] value) throws 
DeserializationException, IOException {
++        if ( value == null ) {
++            return null;
++        }
 +
-       @Override
-       public void serialize(final String value, final OutputStream out) 
throws SerializationException, IOException {
-               out.write(value.getBytes(StandardCharsets.UTF_8));
-       }
++        return new String(value, StandardCharsets.UTF_8);
++    }
++
++    @Override
++    public void serialize(final String value, final OutputStream out) throws 
SerializationException, IOException {
++        out.write(value.getBytes(StandardCharsets.UTF_8));
++    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
index 975dc63,d816e8c..ea3bb63
--- 
a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
@@@ -83,31 -86,15 +86,34 @@@ public interface DistributedMapCacheCli
      <K> boolean containsKey(K key, Serializer<K> keySerializer) throws 
IOException;
  
      /**
 -     * @param <K> type of key
 -     * @param <V> type of value
 +     * Adds the specified key and value to the cache, overwriting any value 
that is
 +     * currently set.
-      * 
++     *
++     * @param <K> the key type
++     * @param <V> the value type
 +     * @param key The key to set
 +     * @param value The value to associate with the given Key
 +     * @param keySerializer the Serializer that will be used to serialize the 
key into bytes
 +     * @param valueSerializer the Serializer that will be used to serialize 
the value into bytes
-      * 
++     *
 +     * @throws IOException if unable to communicate with the remote instance
 +     * @throws NullPointerException if the key or either serializer is null
 +     */
 +    <K, V> void put(K key, V value, Serializer<K> keySerializer, 
Serializer<V> valueSerializer) throws IOException;
-     
++
 +    /**
 +     * Returns the value in the cache for the given key, if one exists;
 +     * otherwise returns <code>null</code>
 +     *
-      * @param <K>
-      * @param <V>
++     * @param <K> the key type
++     * @param <V> the value type
       * @param key the key to lookup in the map
-      * @param keySerializer
-      * @param valueDeserializer
+      * @param keySerializer key serializer
+      * @param valueDeserializer value serializer
       *
-      * @return
-      * @throws IOException
+      * @return the value in the cache for the given key, if one exists;
+      * otherwise returns <code>null</code>
+      * @throws IOException ex
       */
      <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> 
valueDeserializer) throws IOException;
  

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
index 8903046,fad0adb..e9c6f1d
--- 
a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
@@@ -22,9 -22,12 +22,14 @@@ import java.nio.ByteBuffer
  public interface MapCache {
  
      MapPutResult putIfAbsent(ByteBuffer key, ByteBuffer value) throws 
IOException;
+ 
 +    MapPutResult put(ByteBuffer key, ByteBuffer value) throws IOException;
++
      boolean containsKey(ByteBuffer key) throws IOException;
+ 
      ByteBuffer get(ByteBuffer key) throws IOException;
+ 
      ByteBuffer remove(ByteBuffer key) throws IOException;
+ 
      void shutdown() throws IOException;
  }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
index cf8996c,943d6aa..13ed0df
--- 
a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
@@@ -55,70 -55,63 +55,70 @@@ public class MapCacheServer extends Abs
          final String action = dis.readUTF();
          try {
              switch (action) {
 -                case "close": {
 -                    return false;
 -                }
 -                case "putIfAbsent": {
 -                    final byte[] key = readValue(dis);
 -                    final byte[] value = readValue(dis);
 -                    final MapPutResult putResult = 
cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
 -                    dos.writeBoolean(putResult.isSuccessful());
 -                    break;
 -                }
 -                case "containsKey": {
 -                    final byte[] key = readValue(dis);
 -                    final boolean contains = 
cache.containsKey(ByteBuffer.wrap(key));
 -                    dos.writeBoolean(contains);
 -                    break;
 -                }
 -                case "getAndPutIfAbsent": {
 -                    final byte[] key = readValue(dis);
 -                    final byte[] value = readValue(dis);
 -
 -                    final MapPutResult putResult = 
cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
 -                    if (putResult.isSuccessful()) {
 -                        // Put was successful. There was no old value to get.
 -                        dos.writeInt(0);
 -                    } else {
 -                        // we didn't put. Write back the previous value
 -                        final byte[] byteArray = 
putResult.getExistingValue().array();
 -                        dos.writeInt(byteArray.length);
 -                        dos.write(byteArray);
 -                    }
 -
 -                    break;
 -                }
 -                case "get": {
 -                    final byte[] key = readValue(dis);
 -                    final ByteBuffer existingValue = 
cache.get(ByteBuffer.wrap(key));
 -                    if (existingValue == null) {
 -                        // there was no existing value; we did a "put".
 -                        dos.writeInt(0);
 -                    } else {
 -                        // a value already existed. we did not update the map
 -                        final byte[] byteArray = existingValue.array();
 -                        dos.writeInt(byteArray.length);
 -                        dos.write(byteArray);
 -                    }
 -
 -                    break;
 -                }
 -                case "remove": {
 -                    final byte[] key = readValue(dis);
 -                    final boolean removed = 
cache.remove(ByteBuffer.wrap(key)) != null;
 -                    dos.writeBoolean(removed);
 -                    break;
 +            case "close": {
 +                return false;
 +            }
 +            case "putIfAbsent": {
 +                final byte[] key = readValue(dis);
 +                final byte[] value = readValue(dis);
 +                final MapPutResult putResult = 
cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
 +                dos.writeBoolean(putResult.isSuccessful());
 +                break;
 +            }
 +            case "put": {
-               final byte[] key = readValue(dis);
-               final byte[] value = readValue(dis);
-               cache.put(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
++                final byte[] key = readValue(dis);
++                final byte[] value = readValue(dis);
++                cache.put(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
 +                dos.writeBoolean(true);
-               break;
++                break;
 +            }
 +            case "containsKey": {
 +                final byte[] key = readValue(dis);
 +                final boolean contains = 
cache.containsKey(ByteBuffer.wrap(key));
 +                dos.writeBoolean(contains);
 +                break;
 +            }
 +            case "getAndPutIfAbsent": {
 +                final byte[] key = readValue(dis);
 +                final byte[] value = readValue(dis);
 +
 +                final MapPutResult putResult = 
cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
 +                if (putResult.isSuccessful()) {
 +                    // Put was successful. There was no old value to get.
 +                    dos.writeInt(0);
 +                } else {
 +                    // we didn't put. Write back the previous value
 +                    final byte[] byteArray = 
putResult.getExistingValue().array();
 +                    dos.writeInt(byteArray.length);
 +                    dos.write(byteArray);
                  }
 -                default: {
 -                    throw new IOException("Illegal Request");
 +
 +                break;
 +            }
 +            case "get": {
 +                final byte[] key = readValue(dis);
 +                final ByteBuffer existingValue = 
cache.get(ByteBuffer.wrap(key));
 +                if (existingValue == null) {
 +                    // there was no existing value; we did a "put".
 +                    dos.writeInt(0);
 +                } else {
 +                    // a value already existed. we did not update the map
 +                    final byte[] byteArray = existingValue.array();
 +                    dos.writeInt(byteArray.length);
 +                    dos.write(byteArray);
                  }
 +
 +                break;
 +            }
 +            case "remove": {
 +                final byte[] key = readValue(dis);
 +                final boolean removed = cache.remove(ByteBuffer.wrap(key)) != 
null;
 +                dos.writeBoolean(removed);
 +                break;
 +            }
 +            default: {
 +                throw new IOException("Illegal Request");
 +            }
              }
          } finally {
              dos.flush();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
index 82b1787,e821fbf..663f441
--- 
a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
@@@ -75,33 -75,9 +75,33 @@@ public class PersistentMapCache impleme
                  wali.checkpoint();
              }
          }
-         
+ 
          return putResult;
      }
 +    
 +    @Override
 +    public MapPutResult put(final ByteBuffer key, final ByteBuffer value) 
throws IOException {
 +      final MapPutResult putResult = wrapped.put(key, value);
 +        if ( putResult.isSuccessful() ) {
 +            // The put was successful.
 +            final MapWaliRecord record = new MapWaliRecord(UpdateType.CREATE, 
key, value);
 +            final List<MapWaliRecord> records = new ArrayList<>();
 +            records.add(record);
 +
 +            if ( putResult.getEvictedKey() != null ) {
 +                records.add(new MapWaliRecord(UpdateType.DELETE, 
putResult.getEvictedKey(), putResult.getEvictedValue()));
 +            }
 +            
 +            wali.update(Collections.singletonList(record), false);
 +            
 +            final long modCount = modifications.getAndIncrement();
 +            if ( modCount > 0 && modCount % 100000 == 0 ) {
 +                wali.checkpoint();
 +            }
 +        }
 +        
 +        return putResult;
 +    }
  
      @Override
      public boolean containsKey(final ByteBuffer key) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/dc7f7a82/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
index d8f9c45,9e8bbd1..b167c62
--- 
a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
@@@ -105,29 -106,7 +106,29 @@@ public class SimpleMapCache implements 
              writeLock.unlock();
          }
      }
-     
+ 
 +
 +    @Override
 +    public MapPutResult put(final ByteBuffer key, final ByteBuffer value) {
 +        writeLock.lock();
 +        try {
-               // evict if we need to in order to make room for a new entry.
++            // evict if we need to in order to make room for a new entry.
 +            final MapCacheRecord evicted = evict();
 +
 +            final MapCacheRecord record = new MapCacheRecord(key, value);
-               final MapCacheRecord existing = cache.put(key, record);
-               inverseCacheMap.put(record, key);
-               
-               final ByteBuffer existingValue = (existing == null) ? null : 
existing.getValue();
-               final ByteBuffer evictedKey = (evicted == null) ? null : 
evicted.getKey();
-               final ByteBuffer evictedValue = (evicted == null) ? null : 
evicted.getValue();
-               
-               return new MapPutResult(true, key, value, existingValue, 
evictedKey, evictedValue);
++            final MapCacheRecord existing = cache.put(key, record);
++            inverseCacheMap.put(record, key);
++
++            final ByteBuffer existingValue = (existing == null) ? null : 
existing.getValue();
++            final ByteBuffer evictedKey = (evicted == null) ? null : 
evicted.getKey();
++            final ByteBuffer evictedValue = (evicted == null) ? null : 
evicted.getValue();
++
++            return new MapPutResult(true, key, value, existingValue, 
evictedKey, evictedValue);
 +        } finally {
 +            writeLock.unlock();
 +        }
 +    }
-     
++
      @Override
      public boolean containsKey(final ByteBuffer key) {
          readLock.lock();

Reply via email to