Repository: incubator-nifi
Updated Branches:
  refs/heads/develop 20f11b1a7 -> 25e2b6c8a


NIFI-533: Initial implementation of FetchHDFS and ListHDFS


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/94945a6f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/94945a6f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/94945a6f

Branch: refs/heads/develop
Commit: 94945a6fd5e88c083f1f3e32d3b3d5d5954686d1
Parents: 6fa5968
Author: Mark Payne <[email protected]>
Authored: Fri Apr 24 20:13:21 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Fri Apr 24 20:13:21 2015 -0400

----------------------------------------------------------------------
 .../notification/OnPrimaryNodeStateChange.java  |  44 ++
 .../notification/PrimaryNodeState.java          |  33 ++
 .../apache/nifi/controller/FlowController.java  |  16 +-
 .../org/apache/nifi/web/util/SnippetUtils.java  |   2 +-
 .../nifi-hdfs-processors/pom.xml                |   8 +-
 .../hadoop/AbstractHadoopProcessor.java         |  27 ++
 .../nifi/processors/hadoop/FetchHDFS.java       | 126 +++++
 .../apache/nifi/processors/hadoop/GetHDFS.java  |  36 +-
 .../apache/nifi/processors/hadoop/ListHDFS.java | 466 +++++++++++++++++++
 .../processors/hadoop/util/HDFSListing.java     |  83 ++++
 .../nifi/processors/hadoop/util/LongSerDe.java  |  48 ++
 .../processors/hadoop/util/StringSerDe.java     |  44 ++
 .../org.apache.nifi.processor.Processor         |   6 +-
 .../standard/TestDetectDuplicate.java           |   5 +
 .../cache/client/DistributedMapCacheClient.java |  14 +
 .../DistributedMapCacheClientService.java       |  22 +
 .../distributed/cache/server/map/MapCache.java  |   1 +
 .../cache/server/map/MapCacheServer.java        |   7 +
 .../cache/server/map/PersistentMapCache.java    |  24 +
 .../cache/server/map/SimpleMapCache.java        |  22 +
 20 files changed, 996 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/94945a6f/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/notification/OnPrimaryNodeStateChange.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/notification/OnPrimaryNodeStateChange.java
 
b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/notification/OnPrimaryNodeStateChange.java
new file mode 100644
index 0000000..e073660
--- /dev/null
+++ 
b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/notification/OnPrimaryNodeStateChange.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.annotation.notification;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * <p>
+ * Marker annotation that a component can use to indicate that a method should 
be 
+ * called whenever the state of the Primary Node in a cluster has changed.
+ * </p>
+ * 
+ * <p>
+ * Methods with this annotation should take either no arguments or one 
argument of type
+ * {@link PrimaryNodeState}. The {@link PrimaryNodeState} provides context 
about what changed
+ * so that the component can take appropriate action.
+ * </p>
+ */
+@Documented
+@Target({ElementType.METHOD})
+@Retention(RetentionPolicy.RUNTIME)
+@Inherited
+public @interface OnPrimaryNodeStateChange {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/94945a6f/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/notification/PrimaryNodeState.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/notification/PrimaryNodeState.java
 
b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/notification/PrimaryNodeState.java
new file mode 100644
index 0000000..3a7245c
--- /dev/null
+++ 
b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/notification/PrimaryNodeState.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.annotation.notification;
+
+/**
+ * Represents a state change that occurred for the Primary Node of a NiFi 
cluster.
+ */
+public enum PrimaryNodeState {
+       /**
+        * The node receiving this state has been elected the Primary Node of 
the NiFi cluster.
+        */
+       ELECTED_PRIMARY_NODE,
+       
+       /**
+        * The node receiving this state was the Primary Node but has now had 
its Primary Node
+        * role revoked.
+        */
+       PRIMARY_NODE_REVOKED;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/94945a6f/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index ec25ab1..ef9fe77 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -53,6 +53,8 @@ import org.apache.nifi.admin.service.UserService;
 import org.apache.nifi.annotation.lifecycle.OnAdded;
 import org.apache.nifi.annotation.lifecycle.OnRemoved;
 import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
 import org.apache.nifi.cluster.BulletinsPayload;
 import org.apache.nifi.cluster.HeartbeatPayload;
 import org.apache.nifi.cluster.protocol.DataFlow;
@@ -74,8 +76,8 @@ import org.apache.nifi.connectable.Position;
 import org.apache.nifi.connectable.Size;
 import org.apache.nifi.connectable.StandardConnection;
 import org.apache.nifi.controller.exception.CommunicationsException;
-import org.apache.nifi.controller.exception.ProcessorInstantiationException;
 import org.apache.nifi.controller.exception.ComponentLifeCycleException;
+import org.apache.nifi.controller.exception.ProcessorInstantiationException;
 import org.apache.nifi.controller.label.Label;
 import org.apache.nifi.controller.label.StandardLabel;
 import 
org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
@@ -3098,6 +3100,18 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
             LOG.info("Setting primary flag from '" + this.primary + "' to '" + 
primary + "'");
 
+            final PrimaryNodeState nodeState = primary ? 
PrimaryNodeState.ELECTED_PRIMARY_NODE : PrimaryNodeState.PRIMARY_NODE_REVOKED;
+            final ProcessGroup rootGroup = getGroup(getRootGroupId());
+            for (final ProcessorNode procNode : rootGroup.findAllProcessors()) 
{
+               
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class,
 procNode.getProcessor(), nodeState);
+            }
+            for (final ControllerServiceNode serviceNode : 
getAllControllerServices()) {
+               
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class,
 serviceNode.getControllerServiceImplementation(), nodeState);
+            }
+            for (final ReportingTaskNode reportingTaskNode : 
getAllReportingTasks()) {
+               
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class,
 reportingTaskNode.getReportingTask(), nodeState);
+            }
+            
             // update primary
             this.primary = primary;
             eventDrivenWorkerQueue.setPrimary(primary);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/94945a6f/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java
index 76789c6..d8cb69a 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java
@@ -257,7 +257,7 @@ public final class SnippetUtils {
             final PropertyDescriptor descriptor = entry.getKey();
             final String propertyValue = entry.getValue();
             
-            if ( descriptor.getControllerServiceDefinition() != null ) {
+            if ( descriptor.getControllerServiceDefinition() != null && 
propertyValue != null ) {
                 final ControllerServiceNode referencedNode = 
flowController.getControllerServiceNode(propertyValue);
                 if ( referencedNode == null ) {
                     throw new IllegalStateException("Controller Service with 
ID " + propertyValue + " is referenced in template but cannot be found");

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/94945a6f/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
index 3ff1e88..ede32ab 100644
--- a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
@@ -42,8 +42,12 @@
             <groupId>org.apache.hadoop</groupId> 
             <artifactId>hadoop-common</artifactId> 
             <scope>provided</scope> 
-        </dependency> 
-        <dependency> 
+        </dependency>
+        <dependency>
+               <groupId>org.apache.nifi</groupId>
+               
<artifactId>nifi-distributed-cache-client-service-api</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.apache.hadoop</groupId> 
             <artifactId>hadoop-hdfs</artifactId> 
             <scope>provided</scope> 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/94945a6f/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
index 8d5749b..44ebbf8 100644
--- 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
+++ 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
@@ -217,4 +217,31 @@ public abstract class AbstractHadoopProcessor extends 
AbstractProcessor {
         };
     }
 
+    
+    /**
+     * Returns the relative path of the child that does not include the 
filename
+     * or the root path.
+     * @param root
+     * @param child
+     * @return 
+     */
+    public static String getPathDifference(final Path root, final Path child) {
+        final int depthDiff = child.depth() - root.depth();
+        if (depthDiff <= 1) {
+            return "".intern();
+        }
+        String lastRoot = root.getName();
+        Path childsParent = child.getParent();
+        final StringBuilder builder = new StringBuilder();
+        builder.append(childsParent.getName());
+        for (int i = (depthDiff - 3); i >= 0; i--) {
+            childsParent = childsParent.getParent();
+            String name = childsParent.getName();
+            if (name.equals(lastRoot) && 
childsParent.toString().endsWith(root.toString())) {
+                break;
+            }
+            builder.insert(0, Path.SEPARATOR).insert(0, name);
+        }
+        return builder.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/94945a6f/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
new file mode 100644
index 0000000..06bb3c6
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+@SupportsBatching
+@Tags({"hadoop", "hdfs", "get", "ingest", "fetch", "source"})
+@CapabilityDescription("Retrieves a file from HDFS. The content of the 
incoming FlowFile is replaced by the content of the file in HDFS. "
+               + "The file in HDFS is left intact without any changes being 
made to it.")
+@WritesAttribute(attribute="hdfs.failure.reason", description="When a FlowFile 
is routed to 'failure', this attribute is added indicating why the file could "
+               + "not be fetched from HDFS")
+@SeeAlso({ListHDFS.class, GetHDFS.class, PutHDFS.class})
+public class FetchHDFS extends AbstractHadoopProcessor {
+
+       static final PropertyDescriptor FILENAME = new 
PropertyDescriptor.Builder()
+               .name("HDFS Filename")
+               .description("The name of the HDFS file to retrieve")
+               .required(true)
+               .expressionLanguageSupported(true)
+               .defaultValue("${path}/${filename}")
+               .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+               .build();
+       
+       static final Relationship REL_SUCCESS = new Relationship.Builder()
+               .name("success")
+               .description("FlowFiles will be routed to this relationship 
once they have been updated with the content of the HDFS file")
+               .build();
+       static final Relationship REL_FAILURE = new Relationship.Builder()
+               .name("failure")
+               .description("FlowFiles will be routed to this relationship if 
the content of the HDFS file cannot be retrieved and trying again will likely 
not be helpful. "
+                               + "This would occur, for instance, if the file 
is not found or if there is a permissions issue")
+               .build();
+       static final Relationship REL_COMMS_FAILURE = new Relationship.Builder()
+               .name("comms.failure")
+               .description("FlowFiles will be routed to this relationship if 
the content of the HDFS file cannot be retrieve due to a communications 
failure. "
+                               + "This generally indicates that the Fetch 
should be tried again.")
+               .build();
+
+       @Override
+       protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+               final List<PropertyDescriptor> properties = new ArrayList<>();
+               properties.add(HADOOP_CONFIGURATION_RESOURCES);
+               properties.add(FILENAME);
+               return properties;
+       }
+       
+       @Override
+       public Set<Relationship> getRelationships() {
+               final Set<Relationship> relationships = new HashSet<>();
+               relationships.add(REL_SUCCESS);
+               relationships.add(REL_FAILURE);
+               relationships.add(REL_COMMS_FAILURE);
+               return relationships;
+       }
+       
+       @Override
+       public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
+               FlowFile flowFile = session.get();
+               if ( flowFile == null ) {
+                       return;
+               }
+               
+               final FileSystem hdfs = hdfsResources.get().getValue();
+               final Path path = new 
Path(context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue());
+               final URI uri = path.toUri();
+               
+               final StopWatch stopWatch = new StopWatch(true);
+               try (final FSDataInputStream inStream = hdfs.open(path, 16384)) 
{
+                       flowFile = session.importFrom(inStream, flowFile);
+                       stopWatch.stop();
+                       getLogger().info("Successfully received content from {} 
for {} in {}", new Object[] {uri, flowFile, stopWatch.getDuration()});
+                       session.getProvenanceReporter().modifyContent(flowFile, 
"Fetched content from " + uri, stopWatch.getDuration(TimeUnit.MILLISECONDS));
+                       session.transfer(flowFile, REL_SUCCESS);
+               } catch (final FileNotFoundException | AccessControlException 
e) {
+                       getLogger().error("Failed to retrieve content from {} 
for {} due to {}; routing to failure", new Object[] {uri, flowFile, e});
+                       flowFile = session.putAttribute(flowFile, 
"hdfs.failure.reason", e.getMessage());
+                       flowFile = session.penalize(flowFile);
+                       session.transfer(flowFile, REL_FAILURE);
+               } catch (final IOException e) {
+                       getLogger().error("Failed to retrieve content from {} 
for {} due to {}; routing to comms.failure", new Object[] {uri, flowFile, e});
+                       flowFile = session.penalize(flowFile);
+                       session.transfer(flowFile, REL_COMMS_FAILURE);
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/94945a6f/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
index d763c29..1dd5b91 100644
--- 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
+++ 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
@@ -58,16 +58,13 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.StopWatch;
 
-/**
- * This processor reads files from HDFS into NiFi FlowFiles.
- */
 @TriggerWhenEmpty
 @Tags({"hadoop", "HDFS", "get", "fetch", "ingest", "source", "filesystem"})
-@CapabilityDescription("Fetch files from Hadoop Distributed File System (HDFS) 
into FlowFiles")
+@CapabilityDescription("Fetch files from Hadoop Distributed File System (HDFS) 
into FlowFiles. This Processor will delete the file from HDFS after fetching 
it.")
 @WritesAttributes({
         @WritesAttribute(attribute = "filename", description = "The name of 
the file that was read from HDFS."),
         @WritesAttribute(attribute = "path", description = "The path is set to 
the relative path of the file's directory on HDFS. For example, if the 
Directory property is set to /tmp, then files picked up from /tmp will have the 
path attribute set to \"./\". If the Recurse Subdirectories property is set to 
true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will 
be set to \"abc/1/2/3\".") })
-@SeeAlso(PutHDFS.class)
+@SeeAlso({PutHDFS.class, ListHDFS.class})
 public class GetHDFS extends AbstractHadoopProcessor {
 
     public static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
@@ -104,7 +101,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
 
     public static final PropertyDescriptor KEEP_SOURCE_FILE = new 
PropertyDescriptor.Builder()
             .name("Keep Source File")
-            .description("Determines whether to delete the file from HDFS 
after it has been successfully transferred")
+            .description("Determines whether to delete the file from HDFS 
after it has been successfully transferred. If true, the file will be fetched 
repeatedly. This is intended for testing only.")
             .required(true)
             .allowableValues("true", "false")
             .defaultValue("false")
@@ -464,32 +461,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
         return files;
     }
 
-    /**
-     * Returns the relative path of the child that does not include the 
filename
-     * or the root path.
-     * @param root
-     * @param child
-     * @return 
-     */
-    public static String getPathDifference(final Path root, final Path child) {
-        final int depthDiff = child.depth() - root.depth();
-        if (depthDiff <= 1) {
-            return "".intern();
-        }
-        String lastRoot = root.getName();
-        Path childsParent = child.getParent();
-        final StringBuilder builder = new StringBuilder();
-        builder.append(childsParent.getName());
-        for (int i = (depthDiff - 3); i >= 0; i--) {
-            childsParent = childsParent.getParent();
-            String name = childsParent.getName();
-            if (name.equals(lastRoot) && 
childsParent.toString().endsWith(root.toString())) {
-                break;
-            }
-            builder.insert(0, Path.SEPARATOR).insert(0, name);
-        }
-        return builder.toString();
-    }
+    
 
     /**
      * Holder for a snapshot in time of some processor properties that are

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/94945a6f/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
new file mode 100644
index 0000000..707b50d
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.hadoop.util.HDFSListing;
+import org.apache.nifi.processors.hadoop.util.StringSerDe;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+
+
+@TriggerSerially
+@TriggerWhenEmpty
+@Tags({"hadoop", "HDFS", "get", "list", "ingest", "source", "filesystem"})
+@CapabilityDescription("Retrieves a listing of files from HDFS. For each file 
that is listed in HDFS, creates a FlowFile that represents "
+               + "the HDFS file so that it can be fetched in conjunction with 
ListHDFS. This Processor is designed to run on Primary Node only "
+               + "in a cluster. If the primary node changes, the new Primary 
Node will pick up where the previous node left off without duplicating "
+               + "all of the data. Unlike GetHDFS, this Processor does not 
delete any data from HDFS.")
+@WritesAttributes({
+        @WritesAttribute(attribute="filename", description="The name of the 
file that was read from HDFS."),
+        @WritesAttribute(attribute="path", description="The path is set to the 
absolute path of the file's directory on HDFS. For example, if the Directory 
property is set to /tmp, then files picked up from /tmp will have the path 
attribute set to \"./\". If the Recurse Subdirectories property is set to true 
and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be 
set to \"/tmp/abc/1/2/3\"."),
+               @WritesAttribute(attribute="hdfs.owner", description="The user 
that owns the file in HDFS"),
+               @WritesAttribute(attribute="hdfs.group", description="The group 
that owns the file in HDFS"),
+               @WritesAttribute(attribute="hdfs.lastModified", 
description="The timestamp of when the file in HDFS was last modified, as 
milliseconds since midnight Jan 1, 1970 UTC"),
+               @WritesAttribute(attribute="hdfs.length", description="The 
number of bytes in the file in HDFS"),
+               @WritesAttribute(attribute="hdfs.replication", description="The 
number of HDFS replicas for hte file"),
+               @WritesAttribute(attribute="hdfs.permissions", description="The 
permissions for the file in HDFS. This is formatted as 3 characters for the 
owner, 3 for the group, and 3 for other users. For example rw-rw-r--")
+})
+@SeeAlso({GetHDFS.class, FetchHDFS.class, PutHDFS.class})
+public class ListHDFS extends AbstractHadoopProcessor {
+
+       public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new 
PropertyDescriptor.Builder()
+               .name("Distributed Cache Service")
+               .description("Specifies the Controller Service that should be 
used to maintain state about what has been pulled from HDFS so that if a new 
node begins pulling data, it won't duplicate all of the work that has been 
done.")
+               .required(true)
+               .identifiesControllerService(DistributedMapCacheClient.class)
+               .build();
+
+    public static final PropertyDescriptor DIRECTORY = new 
PropertyDescriptor.Builder()
+           .name(DIRECTORY_PROP_NAME)
+           .description("The HDFS directory from which files should be read")
+           .required(true)
+           .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+           .build();
+       
+       public static final PropertyDescriptor RECURSE_SUBDIRS = new 
PropertyDescriptor.Builder()
+           .name("Recurse Subdirectories")
+           .description("Indicates whether to list files from subdirectories 
of the HDFS directory")
+           .required(true)
+           .allowableValues("true", "false")
+           .defaultValue("true")
+           .build();
+       
+       
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+           .name("success")
+           .description("All FlowFiles are transferred to this relationship")
+           .build();
+
+    private volatile Long lastListingTime = null;
+    private volatile Set<Path> latestPathsListed = new HashSet<>();
+    private volatile boolean electedPrimaryNode = false;
+    private File persistenceFile = null;
+    
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+       super.init(context);
+       persistenceFile = new File("conf/state/" + getIdentifier());
+    }
+       
+       @Override
+       protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+               final List<PropertyDescriptor> properties = new ArrayList<>();
+               properties.add(HADOOP_CONFIGURATION_RESOURCES);
+               properties.add(DISTRIBUTED_CACHE_SERVICE);
+               properties.add(DIRECTORY);
+               properties.add(RECURSE_SUBDIRS);
+               return properties;
+       }
+       
+       @Override
+       public Set<Relationship> getRelationships() {
+               final Set<Relationship> relationships = new HashSet<>();
+               relationships.add(REL_SUCCESS);
+               return relationships;
+       }
+
+       private String getKey(final String directory) {
+               return getIdentifier() + ".lastListingTime." + directory;
+       }
+       
+       @OnPrimaryNodeStateChange
+       public void onPrimaryNodeChange(final PrimaryNodeState newState) {
+               if ( newState == PrimaryNodeState.ELECTED_PRIMARY_NODE ) {
+                       electedPrimaryNode = true;
+               }
+       }
+       
+       @Override
+       public void onPropertyModified(final PropertyDescriptor descriptor, 
final String oldValue, final String newValue) {
+               if ( descriptor.equals(DIRECTORY) ) {
+                       lastListingTime = null; // clear lastListingTime so 
that we have to fetch new time
+                       latestPathsListed = new HashSet<>();
+               }
+       }
+       
+       private HDFSListing deserialize(final String serializedState) throws 
JsonParseException, JsonMappingException, IOException {
+               final ObjectMapper mapper = new ObjectMapper();
+        final JsonNode jsonNode = mapper.readTree(serializedState);
+        return mapper.readValue(jsonNode, HDFSListing.class);
+       }
+       
+       
+       @Override
+       public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
+               final String directory = 
context.getProperty(DIRECTORY).getValue();
+
+               // Determine the timestamp for the last file that we've listed.
+               Long minTimestamp = lastListingTime;
+               if ( minTimestamp == null || electedPrimaryNode ) {
+                       // We haven't yet restored any state from local or 
distributed state - or it's been at least a minute since
+                       // we have performed a listing. In this case, 
+                       // First, attempt to get timestamp from distributed 
cache service.
+                       final DistributedMapCacheClient client = 
context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
+                       
+                       try {
+                               final StringSerDe serde = new StringSerDe();
+                               final String serializedState = 
client.get(getKey(directory), serde, serde);
+                               if ( serializedState == null || 
serializedState.isEmpty() ) {
+                                       minTimestamp = null;
+                                       this.latestPathsListed = 
Collections.emptySet();
+                               } else {
+                                       final HDFSListing listing = 
deserialize(serializedState);
+                                       this.lastListingTime = 
listing.getLatestTimestamp().getTime();
+                                       minTimestamp = 
listing.getLatestTimestamp().getTime();
+                                       this.latestPathsListed = 
listing.toPaths();
+                               }
+                               
+                               this.lastListingTime = minTimestamp;
+                               electedPrimaryNode = false;     // no 
requirement to pull an update from the distributed cache anymore.
+                       } catch (final IOException ioe) {
+                               getLogger().error("Failed to retrieve timestamp 
of last listing from Distributed Cache Service. Will not perform listing until 
this is accomplished.");
+                               context.yield();
+                               return;
+                       }
+                       
+                       // Check the persistence file. We want to use the 
latest timestamp that we have so that
+                       // we don't duplicate data.
+                       try {
+                               if ( persistenceFile.exists() ) {
+                                       try (final FileInputStream fis = new 
FileInputStream(persistenceFile)) {
+                                               final Properties props = new 
Properties();
+                                               props.load(fis);
+
+                                               // get the local timestamp for 
this directory, if it exists.
+                                               final String 
locallyPersistedValue = props.getProperty(directory);
+                                               if ( locallyPersistedValue != 
null ) {
+                                                       final HDFSListing 
listing = deserialize(locallyPersistedValue);
+                                                       final long 
localTimestamp = listing.getLatestTimestamp().getTime();
+                                                       
+                                                       // If distributed state 
doesn't have an entry or the local entry is later than the distributed state,
+                                                       // update the 
distributed state so that we are in sync.
+                                                       if (minTimestamp == 
null || localTimestamp > minTimestamp) {
+                                                               minTimestamp = 
localTimestamp;
+                                                               
+                                                               // Our local 
persistence file shows a later time than the Distributed service.
+                                                               // Update the 
distributed service to match our local state.
+                                                               try {
+                                                                       final 
StringSerDe serde = new StringSerDe();
+                                                                       
client.put(getKey(directory), locallyPersistedValue, serde, serde);
+                                                               } catch (final 
IOException ioe) {
+                                                                       
getLogger().warn("Local timestamp for {} is {}, which is later than Distributed 
state but failed to update Distributed "
+                                                                               
        + "state due to {}. If a new node performs HDFS Listing, data 
duplication may occur", 
+                                                                               
        new Object[] {directory, locallyPersistedValue, ioe});
+                                                               }
+                                                       }
+                                               }
+                                       }
+                               }
+                       } catch (final IOException ioe) {
+                               getLogger().warn("Failed to recover local state 
due to {}. Assuming that the state from the distributed cache is correct.", 
ioe);
+                       }
+               }
+               
+               
+               // Pull in any file that is newer than the timestamp that we 
have.
+               final FileSystem hdfs = hdfsResources.get().getValue();
+               final boolean recursive = 
context.getProperty(RECURSE_SUBDIRS).asBoolean();
+               final Path rootPath = new Path(directory);
+               
+               int listCount = 0;
+               Long latestListingModTime = null;
+               final Set<FileStatus> statuses;
+               try {
+                       statuses = getStatuses(rootPath, recursive, hdfs);
+                       for ( final FileStatus status : statuses ) {
+                               // don't get anything where the last modified 
timestamp is equal to our current timestamp.
+                               // if we do, then we run the risk of multiple 
files having the same last mod date but us only
+                               // seeing a portion of them.
+                               // I.e., there could be 5 files with last mod 
date = (now). But if we do the listing now, maybe
+                               // only 2 exist and 3 more will exist later in 
this millisecond. So we ignore anything with a
+                               // modified date not before the current time.
+                               final long fileModTime = 
status.getModificationTime();
+                               
+                               // we only want the file if its timestamp is 
later than the minTimestamp or equal to and we didn't pull it last time.
+                               // Also, HDFS creates files with the suffix 
_COPYING_ when they are being written - we want to ignore those.
+                               boolean fetch = 
!status.getPath().getName().endsWith("_COPYING_") &&
+                                               (minTimestamp == null || 
fileModTime > minTimestamp || (fileModTime == minTimestamp && 
!latestPathsListed.contains(status.getPath())));
+                               
+                               // Create the FlowFile for this path.
+                               if ( fetch ) {
+                                       final Map<String, String> attributes = 
createAttributes(status);
+                                       FlowFile flowFile = session.create();
+                                       flowFile = 
session.putAllAttributes(flowFile, attributes);
+                                       session.transfer(flowFile, REL_SUCCESS);
+                                       listCount++;
+                                       
+                                       if ( latestListingModTime == null || 
fileModTime > latestListingModTime ) {
+                                               latestListingModTime = 
fileModTime;
+                                       }
+                               }
+                       }
+               } catch (final IOException ioe) {
+                       getLogger().error("Failed to perform listing of HDFS 
due to {}", new Object[] {ioe});
+                       return;
+               }
+               
+               if ( listCount > 0 ) {
+                       getLogger().info("Successfully created listing with {} 
new files from HDFS", new Object[] {listCount});
+                       session.commit();
+
+                       // We have performed a listing and pushed the FlowFiles 
out.
+                       // Now, we need to persist state about the Last 
Modified timestamp of the newest file
+                       // that we pulled in. We do this in order to avoid 
pulling in the same file twice.
+                       // However, we want to save the state both locally and 
remotely.
+                       // We store the state remotely so that if a new Primary 
Node is chosen, it can pick up where the
+                       // previously Primary Node left off.
+                       // We also store the state locally so that if the node 
is restarted, and the node cannot contact
+                       // the distributed state cache, the node can continue 
to run (if it is primary node).
+                       String serializedState = null;
+                       try {
+                               serializedState = 
serializeState(latestListingModTime, statuses);
+                       } catch (final Exception e) {
+                               getLogger().error("Failed to serialize state 
due to {}", new Object[] {e});
+                       }
+                       
+                       if ( serializedState != null ) {
+                               // Save our state locally.
+                               try {
+                                       persistLocalState(directory, 
serializedState);
+                               } catch (final IOException ioe) {
+                                       getLogger().warn("Unable to save state 
locally. If the node is restarted now, data may be duplicated. Failure is due 
to {}", ioe);
+                               }
+       
+                               // Attempt to save state to remote server.
+                               final DistributedMapCacheClient client = 
context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
+                               try {
+                                       client.put(getKey(directory), 
serializedState, new StringSerDe(), new StringSerDe());
+                               } catch (final IOException ioe) {
+                                       getLogger().warn("Unable to communicate 
with distributed cache server due to {}. Persisting state locally instead.", 
ioe);
+                               }
+                       }
+                       
+                       lastListingTime = latestListingModTime;
+               } else {
+                       getLogger().debug("There is no data to list. 
Yielding.");
+                       context.yield();
+                       
+                       // lastListingTime = 0 so that we don't continually 
poll the distributed cache / local file system
+                       if ( lastListingTime == null ) {
+                               lastListingTime = 0L;
+                       }
+                       
+                       return;
+               }
+       }
+       
+       private Set<FileStatus> getStatuses(final Path path, final boolean 
recursive, final FileSystem hdfs) throws IOException {
+               final Set<FileStatus> statusSet = new HashSet<>();
+               
+               final FileStatus[] statuses = hdfs.listStatus(path);
+               
+               for ( final FileStatus status : statuses ) {
+                       if ( status.isDirectory() ) {
+                               if ( recursive ) {
+                                       try {
+                                               
statusSet.addAll(getStatuses(status.getPath(), recursive, hdfs));
+                                       } catch (final IOException ioe) {
+                                               getLogger().error("Failed to 
retrieve HDFS listing for subdirectory {} due to {}; will continue listing 
others", new Object[] {status.getPath(), ioe});
+                                       }
+                               }
+                       } else {
+                               statusSet.add(status);
+                       }
+               }
+               
+               return statusSet;
+       }
+       
+       
+       private String serializeState(final long latestListingTime, final 
Set<FileStatus> statuses) throws JsonGenerationException, JsonMappingException, 
IOException {
+               // we need to keep track of all files that we pulled in that 
had a modification time equal to
+               // lastListingTime so that we can avoid pulling those files in 
again. We can't just ignore any files
+               // that have a mod time equal to that timestamp because more 
files may come in with the same timestamp
+               // later in the same millisecond.
+               if ( statuses.isEmpty() ) {
+                       return null;
+               } else {
+                       final List<FileStatus> sortedStatuses = new 
ArrayList<>(statuses);
+                       Collections.sort(sortedStatuses, new 
Comparator<FileStatus>() {
+                               @Override
+                               public int compare(final FileStatus o1, final 
FileStatus o2) {
+                                       return 
Long.compare(o1.getModificationTime(), o2.getModificationTime());
+                               }
+                       });
+                       
+                       final long latestListingModTime = 
sortedStatuses.get(sortedStatuses.size() - 1).getModificationTime();
+                       final Set<Path> pathsWithModTimeEqualToListingModTime = 
new HashSet<>();
+                       for (int i=sortedStatuses.size() - 1; i >= 0; i--) {
+                               final FileStatus status = sortedStatuses.get(i);
+                               if (status.getModificationTime() == 
latestListingModTime) {
+                                       
pathsWithModTimeEqualToListingModTime.add(status.getPath());
+                               }
+                       }
+                       
+                       this.latestPathsListed = 
pathsWithModTimeEqualToListingModTime;
+                       
+                       final HDFSListing listing = new HDFSListing();
+                       listing.setLatestTimestamp(new 
Date(latestListingModTime));
+                       final Set<String> paths = new HashSet<>();
+                       for ( final Path path : 
pathsWithModTimeEqualToListingModTime ) {
+                               paths.add(path.toUri().toString());
+                       }
+                       listing.setMatchingPaths(paths);
+
+                       final ObjectMapper mapper = new ObjectMapper();
+                       final String serializedState = 
mapper.writerWithType(HDFSListing.class).writeValueAsString(listing);
+                       return serializedState;
+               }
+       }
+       
+       private void persistLocalState(final String directory, final String 
serializedState) throws IOException {
+               // we need to keep track of all files that we pulled in that 
had a modification time equal to
+               // lastListingTime so that we can avoid pulling those files in 
again. We can't just ignore any files
+               // that have a mod time equal to that timestamp because more 
files may come in with the same timestamp
+               // later in the same millisecond.
+               final File dir = persistenceFile.getParentFile();
+               if ( !dir.exists() && !dir.mkdirs() ) {
+                       throw new IOException("Could not create directory " + 
dir.getAbsolutePath() + " in order to save local state");
+               }
+               
+               final Properties props = new Properties();
+               if ( persistenceFile.exists() ) {
+                       try (final FileInputStream fis = new 
FileInputStream(persistenceFile)) {
+                               props.load(fis);
+                       }
+               }
+
+               props.setProperty(directory, serializedState);
+               
+               try (final FileOutputStream fos = new 
FileOutputStream(persistenceFile)) {
+                       props.store(fos, null);
+               }
+       }
+
+       private String getAbsolutePath(final Path path) {
+               final Path parent = path.getParent();
+               final String prefix = (parent == null || 
parent.getName().equals("")) ? "" : getAbsolutePath(parent);
+               return prefix + "/" + path.getName();
+       }
+       
+       private Map<String, String> createAttributes(final FileStatus status) {
+               final Map<String, String> attributes = new HashMap<>();
+               attributes.put(CoreAttributes.FILENAME.key(), 
status.getPath().getName());
+               attributes.put(CoreAttributes.PATH.key(), 
getAbsolutePath(status.getPath().getParent()));
+               
+               attributes.put("hdfs.owner", status.getOwner());
+               attributes.put("hdfs.group", status.getGroup());
+               attributes.put("hdfs.lastModified", 
String.valueOf(status.getModificationTime()));
+               attributes.put("hdfs.length", String.valueOf(status.getLen()));
+               attributes.put("hdfs.replication", 
String.valueOf(status.getReplication()));
+               
+               final FsPermission permission = status.getPermission();
+               final String perms = getPerms(permission.getUserAction()) + 
getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction());
+               attributes.put("hdfs.permissions", perms);
+               return attributes;
+       }
+       
+       private String getPerms(final FsAction action) {
+               final StringBuilder sb = new StringBuilder();
+               if ( action.implies(FsAction.READ) ) {
+                       sb.append("r");
+               } else {
+                       sb.append("-");
+               }
+               
+               if ( action.implies(FsAction.WRITE) ) {
+                       sb.append("w");
+               } else {
+                       sb.append("-");
+               }
+               
+               if ( action.implies(FsAction.EXECUTE) ) {
+                       sb.append("x");
+               } else {
+                       sb.append("-");
+               }
+               
+               return sb.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/94945a6f/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java
 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java
new file mode 100644
index 0000000..9f4d68b
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HDFSListing.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.xml.bind.annotation.XmlTransient;
+import javax.xml.bind.annotation.XmlType;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A simple POJO for maintaining state about the last HDFS Listing that was 
performed so that
+ * we can avoid pulling the same file multiple times 
+ */
+@XmlType(name = "listing")
+public class HDFSListing {
+       private Date latestTimestamp;
+       private Collection<String> matchingPaths;
+       
+       /**
+        * @return the modification date of the newest file that was contained 
in the HDFS Listing
+        */
+       public Date getLatestTimestamp() {
+               return latestTimestamp;
+       }
+       
+       /**
+        * Sets the timestamp of the modification date of the newest file that 
was contained in the HDFS Listing
+        * 
+        * @param latestTimestamp the timestamp of the modification date of the 
newest file that was contained in the HDFS Listing
+        */
+       public void setLatestTimestamp(Date latestTimestamp) {
+               this.latestTimestamp = latestTimestamp;
+       }
+       
+       /**
+        * @return a Collection containing the paths of all files in the HDFS 
Listing whose Modification date
+        * was equal to {@link #getLatestTimestamp()}
+        */
+       @XmlTransient
+       public Collection<String> getMatchingPaths() {
+               return matchingPaths;
+       }
+       
+       /**
+        * @return a Collection of {@link Path} objects equivalent to those 
returned by {@link #getMatchingPaths()}
+        */
+       public Set<Path> toPaths() {
+               final Set<Path> paths = new HashSet<>(matchingPaths.size());
+               for ( final String pathname : matchingPaths ) {
+                       paths.add(new Path(pathname));
+               }
+               return paths;
+       }
+
+       /**
+        * Sets the Collection containing the paths of all files in the HDFS 
Listing whose Modification Date was
+        * equal to {@link #getLatestTimestamp()}
+        * @param matchingPaths
+        */
+       public void setMatchingPaths(Collection<String> matchingPaths) {
+               this.matchingPaths = matchingPaths;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/94945a6f/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java
 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java
new file mode 100644
index 0000000..ef0e590
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/LongSerDe.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import 
org.apache.nifi.distributed.cache.client.exception.DeserializationException;
+import 
org.apache.nifi.distributed.cache.client.exception.SerializationException;
+
+public class LongSerDe implements Serializer<Long>, Deserializer<Long> {
+
+       @Override
+       public Long deserialize(final byte[] input) throws 
DeserializationException, IOException {
+               if ( input == null || input.length == 0 ) {
+                       return null;
+               }
+               
+               final DataInputStream dis = new DataInputStream(new 
ByteArrayInputStream(input));
+               return dis.readLong();
+       }
+
+       @Override
+       public void serialize(final Long value, final OutputStream out) throws 
SerializationException, IOException {
+               final DataOutputStream dos = new DataOutputStream(out);
+               dos.writeLong(value);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/94945a6f/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java
 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java
new file mode 100644
index 0000000..848831f
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/StringSerDe.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import 
org.apache.nifi.distributed.cache.client.exception.DeserializationException;
+import 
org.apache.nifi.distributed.cache.client.exception.SerializationException;
+
+public class StringSerDe implements Serializer<String>, Deserializer<String> {
+
+       @Override
+       public String deserialize(final byte[] value) throws 
DeserializationException, IOException {
+               if ( value == null ) {
+                       return null;
+               }
+               
+               return new String(value, StandardCharsets.UTF_8);
+       }
+
+       @Override
+       public void serialize(final String value, final OutputStream out) 
throws SerializationException, IOException {
+               out.write(value.getBytes(StandardCharsets.UTF_8));
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/94945a6f/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index da16ef7..4b359e8 100644
--- 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -12,7 +12,9 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-org.apache.nifi.processors.hadoop.GetHDFS
-org.apache.nifi.processors.hadoop.PutHDFS
 org.apache.nifi.processors.hadoop.CreateHadoopSequenceFile
+org.apache.nifi.processors.hadoop.FetchHDFS
+org.apache.nifi.processors.hadoop.GetHDFS
 org.apache.nifi.processors.hadoop.GetHDFSSequenceFile
+org.apache.nifi.processors.hadoop.ListHDFS
+org.apache.nifi.processors.hadoop.PutHDFS

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/94945a6f/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
index eed0d36..df7297a 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
@@ -178,6 +178,11 @@ public class TestDetectDuplicate {
             exists = false;
             return true;
         }
+
+               @Override
+               public <K, V> void put(K key, V value, Serializer<K> 
keySerializer, Serializer<V> valueSerializer) throws IOException {
+                       
+               }
     }
 
     private static class StringSerializer implements Serializer<String> {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/94945a6f/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
 
b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
index 8283137..975dc63 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
@@ -83,6 +83,20 @@ public interface DistributedMapCacheClient extends 
ControllerService {
     <K> boolean containsKey(K key, Serializer<K> keySerializer) throws 
IOException;
 
     /**
+     * Adds the specified key and value to the cache, overwriting any value 
that is
+     * currently set.
+     * 
+     * @param key The key to set
+     * @param value The value to associate with the given Key
+     * @param keySerializer the Serializer that will be used to serialize the 
key into bytes
+     * @param valueSerializer the Serializer that will be used to serialize 
the value into bytes
+     * 
+     * @throws IOException if unable to communicate with the remote instance
+     * @throws NullPointerException if the key or either serializer is null
+     */
+    <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> 
valueSerializer) throws IOException;
+    
+    /**
      * Returns the value in the cache for the given key, if one exists;
      * otherwise returns <code>null</code>
      *

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/94945a6f/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
 
b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
index 92bda8f..06ff42b 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
@@ -116,6 +116,28 @@ public class DistributedMapCacheClientService extends 
AbstractControllerService
         });
     }
 
+    public <K, V> void put(final K key, final V value, final Serializer<K> 
keySerializer, final Serializer<V> valueSerializer) throws IOException {
+       withCommsSession(new CommsAction<Object>() {
+                       @Override
+                       public Object execute(final CommsSession session) 
throws IOException {
+                               final DataOutputStream dos = new 
DataOutputStream(session.getOutputStream());
+                               dos.writeUTF("put");
+                               
+                               serialize(key, keySerializer, dos);
+                               serialize(value, valueSerializer, dos);
+                               
+                               dos.flush();
+                               final DataInputStream dis = new 
DataInputStream(session.getInputStream());
+                               final boolean success = dis.readBoolean();
+                               if ( !success ) {
+                                       throw new IOException("Expected to 
receive confirmation of 'put' request but received unexpected response");
+                               }
+                               
+                               return null;
+                       }
+       });
+    }
+    
     @Override
     public <K> boolean containsKey(final K key, final Serializer<K> 
keySerializer) throws IOException {
         return withCommsSession(new CommsAction<Boolean>() {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/94945a6f/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
 
b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
index 534cb0b..8903046 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 public interface MapCache {
 
     MapPutResult putIfAbsent(ByteBuffer key, ByteBuffer value) throws 
IOException;
+    MapPutResult put(ByteBuffer key, ByteBuffer value) throws IOException;
     boolean containsKey(ByteBuffer key) throws IOException;
     ByteBuffer get(ByteBuffer key) throws IOException;
     ByteBuffer remove(ByteBuffer key) throws IOException;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/94945a6f/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
 
b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
index e4a600e..cf8996c 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
@@ -65,6 +65,13 @@ public class MapCacheServer extends AbstractCacheServer {
                 dos.writeBoolean(putResult.isSuccessful());
                 break;
             }
+            case "put": {
+               final byte[] key = readValue(dis);
+               final byte[] value = readValue(dis);
+               cache.put(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
+                dos.writeBoolean(true);
+               break;
+            }
             case "containsKey": {
                 final byte[] key = readValue(dis);
                 final boolean contains = 
cache.containsKey(ByteBuffer.wrap(key));

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/94945a6f/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
 
b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
index 77fb77d..82b1787 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
@@ -78,6 +78,30 @@ public class PersistentMapCache implements MapCache {
         
         return putResult;
     }
+    
+    @Override
+    public MapPutResult put(final ByteBuffer key, final ByteBuffer value) 
throws IOException {
+       final MapPutResult putResult = wrapped.put(key, value);
+        if ( putResult.isSuccessful() ) {
+            // The put was successful.
+            final MapWaliRecord record = new MapWaliRecord(UpdateType.CREATE, 
key, value);
+            final List<MapWaliRecord> records = new ArrayList<>();
+            records.add(record);
+
+            if ( putResult.getEvictedKey() != null ) {
+                records.add(new MapWaliRecord(UpdateType.DELETE, 
putResult.getEvictedKey(), putResult.getEvictedValue()));
+            }
+            
+            wali.update(Collections.singletonList(record), false);
+            
+            final long modCount = modifications.getAndIncrement();
+            if ( modCount > 0 && modCount % 100000 == 0 ) {
+                wali.checkpoint();
+            }
+        }
+        
+        return putResult;
+    }
 
     @Override
     public boolean containsKey(final ByteBuffer key) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/94945a6f/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
 
b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
index 10139f1..d8f9c45 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
@@ -106,6 +106,28 @@ public class SimpleMapCache implements MapCache {
         }
     }
     
+
+    @Override
+    public MapPutResult put(final ByteBuffer key, final ByteBuffer value) {
+        writeLock.lock();
+        try {
+               // evict if we need to in order to make room for a new entry.
+            final MapCacheRecord evicted = evict();
+
+            final MapCacheRecord record = new MapCacheRecord(key, value);
+               final MapCacheRecord existing = cache.put(key, record);
+               inverseCacheMap.put(record, key);
+               
+               final ByteBuffer existingValue = (existing == null) ? null : 
existing.getValue();
+               final ByteBuffer evictedKey = (evicted == null) ? null : 
evicted.getKey();
+               final ByteBuffer evictedValue = (evicted == null) ? null : 
evicted.getValue();
+               
+               return new MapPutResult(true, key, value, existingValue, 
evictedKey, evictedValue);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+    
     @Override
     public boolean containsKey(final ByteBuffer key) {
         readLock.lock();

Reply via email to