http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
index 0000000,20ac738..0610d8f
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
+++ 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
@@@ -1,0 -1,563 +1,562 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.hadoop;
+ 
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Set;
+ import java.util.concurrent.BlockingQueue;
+ import java.util.concurrent.LinkedBlockingQueue;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicLong;
+ import java.util.concurrent.locks.Lock;
+ import java.util.concurrent.locks.ReentrantLock;
+ import java.util.regex.Pattern;
+ 
++import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.documentation.Tags;
++import org.apache.nifi.annotation.lifecycle.OnScheduled;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.ValidationContext;
+ import org.apache.nifi.components.ValidationResult;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.processor.DataUnit;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.OnScheduled;
 -import org.apache.nifi.processor.annotation.Tags;
 -import org.apache.nifi.processor.annotation.TriggerWhenEmpty;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.util.StopWatch;
 -
+ import org.apache.commons.io.IOUtils;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FSDataInputStream;
+ import org.apache.hadoop.fs.FileStatus;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.fs.PathFilter;
+ 
+ /**
+  * This processor reads files from HDFS into NiFi FlowFiles.
+  */
+ @TriggerWhenEmpty
+ @Tags({"hadoop", "HDFS", "get", "fetch", "ingest", "source", "filesystem"})
+ @CapabilityDescription("Fetch files from Hadoop Distributed File System 
(HDFS) into FlowFiles")
+ public class GetHDFS extends AbstractHadoopProcessor {
+ 
+     public static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
+     public static final int BUFFER_SIZE_DEFAULT = 4096;
+     public static final int MAX_WORKING_QUEUE_SIZE = 25000;
+ 
+     // relationships
+     public static final Relationship REL_SUCCESS = new Relationship.Builder()
+             .name("success")
+             .description("All files retrieved from HDFS are transferred to 
this relationship")
+             .build();
+ 
+     public static final Relationship REL_PASSTHROUGH = new 
Relationship.Builder()
+             .name("passthrough")
+             .description(
+                     "If this processor has an input queue for some reason, 
then FlowFiles arriving on that input are transferred to this relationship")
+             .build();
+ 
+     // properties
+     public static final PropertyDescriptor DIRECTORY = new 
PropertyDescriptor.Builder()
+             .name(DIRECTORY_PROP_NAME)
+             .description("The HDFS directory from which files should be read")
+             .required(true)
+             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+             .build();
+ 
+     public static final PropertyDescriptor RECURSE_SUBDIRS = new 
PropertyDescriptor.Builder()
+             .name("Recurse Subdirectories")
+             .description("Indicates whether to pull files from subdirectories 
of the HDFS directory")
+             .required(true)
+             .allowableValues("true", "false")
+             .defaultValue("true")
+             .build();
+ 
+     public static final PropertyDescriptor KEEP_SOURCE_FILE = new 
PropertyDescriptor.Builder()
+             .name("Keep Source File")
+             .description("Determines whether to delete the file from HDFS 
after it has been successfully transferred")
+             .required(true)
+             .allowableValues("true", "false")
+             .defaultValue("false")
+             .build();
+ 
+     public static final PropertyDescriptor FILE_FILTER_REGEX = new 
PropertyDescriptor.Builder()
+             .name("File Filter Regex")
+             .description(
+                     "A Java Regular Expression for filtering Filenames; if a 
filter is supplied then only files whose names match that Regular Expression 
will be fetched, otherwise all files will be fetched")
+             .required(false)
+             .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+             .build();
+ 
+     public static final PropertyDescriptor FILTER_MATCH_NAME_ONLY = new 
PropertyDescriptor.Builder()
+             .name("Filter Match Name Only")
+             .description(
+                     "If true then File Filter Regex will match on just the 
filename, otherwise subdirectory names will be included with filename in the 
regex comparison")
+             .required(true)
+             .allowableValues("true", "false")
+             .defaultValue("true")
+             .build();
+ 
+     public static final PropertyDescriptor IGNORE_DOTTED_FILES = new 
PropertyDescriptor.Builder()
+             .name("Ignore Dotted Files")
+             .description("If true, files whose names begin with a dot (\".\") 
will be ignored")
+             .required(true)
+             .allowableValues("true", "false")
+             .defaultValue("true")
+             .build();
+ 
+     public static final PropertyDescriptor MIN_AGE = new 
PropertyDescriptor.Builder()
+             .name("Minimum File Age")
+             .description(
+                     "The minimum age that a file must be in order to be 
pulled; any file younger than this amount of time (based on last modification 
date) will be ignored")
+             .required(true)
+             .addValidator(
+                     StandardValidators.createTimePeriodValidator(0, 
TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
+             .defaultValue("0 sec")
+             .build();
+ 
+     public static final PropertyDescriptor MAX_AGE = new 
PropertyDescriptor.Builder()
+             .name("Maximum File Age")
+             .description(
+                     "The maximum age that a file must be in order to be 
pulled; any file older than this amount of time (based on last modification 
date) will be ignored")
+             .required(false)
+             .addValidator(
+                     StandardValidators.createTimePeriodValidator(100, 
TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
+             .build();
+ 
+     public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+             .name("Batch Size")
+             .description("The maximum number of files to pull in each 
iteration, based on run schedule.")
+             .required(true)
+             .defaultValue("100")
+             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+             .build();
+ 
+     public static final PropertyDescriptor POLLING_INTERVAL = new 
PropertyDescriptor.Builder()
+             .name("Polling Interval")
+             .description("Indicates how long to wait between performing 
directory listings")
+             .required(true)
+             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+             .defaultValue("0 sec")
+             .build();
+ 
+     public static final PropertyDescriptor BUFFER_SIZE = new 
PropertyDescriptor.Builder()
+             .name("IO Buffer Size")
+             .description("Amount of memory to use to buffer file contents 
during IO. This overrides the Hadoop Configuration")
+             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+             .build();
+ 
+     private static final Set<Relationship> relationships;
+     protected static final List<PropertyDescriptor> localProperties;
+ 
+     static {
+         final Set<Relationship> rels = new HashSet<>();
+         rels.add(REL_SUCCESS);
+         rels.add(REL_PASSTHROUGH);
+         relationships = Collections.unmodifiableSet(rels);
+ 
+         List<PropertyDescriptor> props = new ArrayList<>(properties);
+         props.add(DIRECTORY);
+         props.add(RECURSE_SUBDIRS);
+         props.add(KEEP_SOURCE_FILE);
+         props.add(FILE_FILTER_REGEX);
+         props.add(FILTER_MATCH_NAME_ONLY);
+         props.add(IGNORE_DOTTED_FILES);
+         props.add(MIN_AGE);
+         props.add(MAX_AGE);
+         props.add(POLLING_INTERVAL);
+         props.add(BATCH_SIZE);
+         props.add(BUFFER_SIZE);
+         localProperties = Collections.unmodifiableList(props);
+     }
+ 
+     protected ProcessorConfiguration processorConfig;
+     private final AtomicLong logEmptyListing = new AtomicLong(2L);
+ 
+     private final AtomicLong lastPollTime = new AtomicLong(0L);
+     private final Lock listingLock = new ReentrantLock();
+     private final Lock queueLock = new ReentrantLock();
+ 
+     private final BlockingQueue<Path> filePathQueue = new 
LinkedBlockingQueue<>(MAX_WORKING_QUEUE_SIZE);
+     private final BlockingQueue<Path> processing = new 
LinkedBlockingQueue<>();
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return localProperties;
+     }
+ 
+     @Override
+     protected Collection<ValidationResult> customValidate(ValidationContext 
context) {
+         final List<ValidationResult> problems = new 
ArrayList<>(super.customValidate(context));
+ 
+         final Long minAgeProp = 
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+         final Long maxAgeProp = 
context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+         final long minimumAge = (minAgeProp == null) ? 0L : minAgeProp;
+         final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : 
maxAgeProp;
+         if (minimumAge > maximumAge) {
+             problems.add(new 
ValidationResult.Builder().valid(false).subject("GetHDFS Configuration")
+                     .explanation(MIN_AGE.getName() + " cannot be greater than 
" + MAX_AGE.getName()).build());
+         }
+ 
+         return problems;
+     }
+ 
+     @OnScheduled
+     public void onScheduled(ProcessContext context) throws IOException {
+         abstractOnScheduled(context);
+         // copy configuration values to pass them around cleanly
+         processorConfig = new ProcessorConfiguration(context);
+         FileSystem fs = hdfsResources.get().getValue();
+         Path dir = new Path(context.getProperty(DIRECTORY).getValue());
+         if (!fs.exists(dir)) {
+             throw new IOException("PropertyDescriptor " + DIRECTORY + " has 
invalid value " + dir + ". The directory does not exist.");
+         }
+ 
+         // forget the state of the queue in case HDFS contents changed while 
this processor was turned off
+         queueLock.lock();
+         try {
+             filePathQueue.clear();
+             processing.clear();
+         } finally {
+             queueLock.unlock();
+         }
+     }
+ 
+     @Override
+     public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+ 
+         int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+         final List<Path> files = new ArrayList<>(batchSize);
+         List<FlowFile> inputFlowFiles = session.get(10);
+         for (FlowFile ff : inputFlowFiles) {
+             session.transfer(ff, REL_PASSTHROUGH);
+         }
+ 
+         // retrieve new file names from HDFS and place them into work queue
+         if (filePathQueue.size() < MAX_WORKING_QUEUE_SIZE / 2) {
+             try {
+                 final StopWatch stopWatch = new StopWatch(true);
+                 Set<Path> listedFiles = performListing(context);
+                 stopWatch.stop();
+                 final long millis = 
stopWatch.getDuration(TimeUnit.MILLISECONDS);
+ 
+                 if (listedFiles != null) {
+                     // place files into the work queue
+                     int newItems = 0;
+                     queueLock.lock();
+                     try {
+                         for (Path file : listedFiles) {
+                             if (!filePathQueue.contains(file) && 
!processing.contains(file)) {
+                                 if (!filePathQueue.offer(file)) {
+                                     break;
+                                 }
+                                 newItems++;
+                             }
+                         }
+                     } catch (Exception e) {
+                         getLogger().warn("Could not add to processing queue 
due to {}", new Object[]{e});
+                     } finally {
+                         queueLock.unlock();
+                     }
+                     if (listedFiles.size() > 0) {
+                         logEmptyListing.set(3L);
+                     }
+                     if (logEmptyListing.getAndDecrement() > 0) {
+                         getLogger().info("Obtained file listing in {} 
milliseconds; listing had {} items, {} of which were new",
+                                 new Object[]{millis, listedFiles.size(), 
newItems});
+                     }
+                 }
+             } catch (IOException e) {
+                 context.yield();
+                 getLogger().warn("Error while retrieving list of files due to 
{}", new Object[]{e});
+                 return;
+             }
+         }
+ 
+         // prepare to process a batch of files in the queue
+         queueLock.lock();
+         try {
+             filePathQueue.drainTo(files, batchSize);
+             if (files.isEmpty()) {
+                 // nothing to do!
+                 context.yield();
+                 return;
+             }
+             processing.addAll(files);
+         } finally {
+             queueLock.unlock();
+         }
+ 
+         processBatchOfFiles(files, context, session);
+ 
+         queueLock.lock();
+         try {
+             processing.removeAll(files);
+         } finally {
+             queueLock.unlock();
+         }
+     }
+ 
+     protected void processBatchOfFiles(final List<Path> files, final 
ProcessContext context, final ProcessSession session) {
+         // process the batch of files
+         FSDataInputStream stream = null;
+         Configuration conf = hdfsResources.get().getKey();
+         FileSystem hdfs = hdfsResources.get().getValue();
+         final boolean keepSourceFiles = 
context.getProperty(KEEP_SOURCE_FILE).asBoolean();
+         final Double bufferSizeProp = 
context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
+         int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : 
conf.getInt(BUFFER_SIZE_KEY,
+                 BUFFER_SIZE_DEFAULT);
+         final Path rootDir = new 
Path(context.getProperty(DIRECTORY).getValue());
+         for (final Path file : files) {
+             try {
+                 if (!hdfs.exists(file)) {
+                     continue; // if file is no longer there then move on
+                 }
+                 final String filename = file.getName();
+                 final String relativePath = getPathDifference(rootDir, file);
+ 
+                 stream = hdfs.open(file, bufferSize);
+                 FlowFile flowFile = session.create();
+ 
+                 final StopWatch stopWatch = new StopWatch(true);
+                 flowFile = session.importFrom(stream, flowFile);
+                 stopWatch.stop();
+                 final String dataRate = 
stopWatch.calculateDataRate(flowFile.getSize());
+                 final long millis = 
stopWatch.getDuration(TimeUnit.MILLISECONDS);
+ 
+                 flowFile = session.putAttribute(flowFile, 
CoreAttributes.PATH.key(), relativePath);
+                 flowFile = session.putAttribute(flowFile, 
CoreAttributes.FILENAME.key(), filename);
+ 
+                 if (!keepSourceFiles && !hdfs.delete(file, false)) {
+                     getLogger().warn("Could not remove {} from HDFS. Not 
ingesting this file ...",
+                             new Object[]{file});
+                     session.remove(flowFile);
+                     continue;
+                 }
+ 
+                 final String transitUri = (filename.startsWith("/")) ? 
"hdfs:/" + filename : "hdfs://" + filename;
+                 session.getProvenanceReporter().receive(flowFile, transitUri);
+                 session.transfer(flowFile, REL_SUCCESS);
+                 getLogger().info("retrieved {} from HDFS {} in {} 
milliseconds at a rate of {}",
+                         new Object[]{flowFile, file, millis, dataRate});
+                 session.commit();
+             } catch (final Throwable t) {
+                 getLogger().error("Error retrieving file {} from HDFS due to 
{}", new Object[]{file, t});
+                 session.rollback();
+                 context.yield();
+             } finally {
+                 IOUtils.closeQuietly(stream);
+                 stream = null;
+             }
+         }
+     }
+ 
+     /**
+      * Do a listing of HDFS if the POLLING_INTERVAL has lapsed.
+      *
+      * Will return null if POLLING_INTERVAL has not lapsed. Will return an 
empty
+      * set if no files were found on HDFS that matched the configured filters.
+      * @param context
+      * @return 
+      * @throws java.io.IOException
+      */
+     protected Set<Path> performListing(final ProcessContext context) throws 
IOException {
+ 
+         final long pollingIntervalMillis = 
context.getProperty(POLLING_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
+         final long nextPollTime = lastPollTime.get() + pollingIntervalMillis;
+         Set<Path> listing = null;
+ 
+         if (System.currentTimeMillis() >= nextPollTime && 
listingLock.tryLock()) {
+             try {
+                 final FileSystem hdfs = hdfsResources.get().getValue();
+                 // get listing
+                 listing = selectFiles(hdfs, 
processorConfig.getConfiguredRootDirPath(), null);
+                 lastPollTime.set(System.currentTimeMillis());
+             } finally {
+                 listingLock.unlock();
+             }
+         }
+ 
+         return listing;
+     }
+ 
+     /**
+      * Poll HDFS for files to process that match the configured file filters.
+      * @param hdfs
+      * @param dir
+      * @param filesVisited
+      * @return 
+      * @throws java.io.IOException 
+      */
+     protected Set<Path> selectFiles(final FileSystem hdfs, final Path dir, 
Set<Path> filesVisited) throws IOException {
+         if (null == filesVisited) {
+             filesVisited = new HashSet<>();
+         }
+ 
+         if (!hdfs.exists(dir)) {
+             throw new IOException("Selection directory " + dir.toString() + " 
doesn't appear to exist!");
+         }
+ 
+         final Set<Path> files = new HashSet<>();
+ 
+         for (final FileStatus file : hdfs.listStatus(dir)) {
+             if (files.size() >= MAX_WORKING_QUEUE_SIZE) {
+                 // no need to make the files set larger than what we would 
queue anyway
+                 break;
+             }
+ 
+             final Path canonicalFile = file.getPath();
+ 
+             if (!filesVisited.add(canonicalFile)) { // skip files we've 
already seen (may be looping directory links)
+                 continue;
+             }
+ 
+             if (file.isDirectory() && processorConfig.getRecurseSubdirs()) {
+                 files.addAll(selectFiles(hdfs, canonicalFile, filesVisited));
+ 
+             } else if (!file.isDirectory() && 
processorConfig.getPathFilter().accept(canonicalFile)) {
+                 final long fileAge = System.currentTimeMillis() - 
file.getModificationTime();
+                 if (processorConfig.getMinimumAge() < fileAge && fileAge < 
processorConfig.getMaximumAge()) {
+                     files.add(canonicalFile);
+ 
+                     if (getLogger().isDebugEnabled()) {
+                         getLogger().debug(this + " selected file at path: " + 
canonicalFile.toString());
+                     }
+ 
+                 }
+             }
+         }
+         return files;
+     }
+ 
+     /**
+      * Returns the relative path of the child that does not include the 
filename
+      * or the root path.
+      * @param root
+      * @param child
+      * @return 
+      */
+     public static String getPathDifference(final Path root, final Path child) 
{
+         final int depthDiff = child.depth() - root.depth();
+         if (depthDiff <= 1) {
+             return "".intern();
+         }
+         String lastRoot = root.getName();
+         Path childsParent = child.getParent();
+         final StringBuilder builder = new StringBuilder();
+         builder.append(childsParent.getName());
+         for (int i = (depthDiff - 3); i >= 0; i--) {
+             childsParent = childsParent.getParent();
+             String name = childsParent.getName();
+             if (name.equals(lastRoot) && 
childsParent.toString().endsWith(root.toString())) {
+                 break;
+             }
+             builder.insert(0, Path.SEPARATOR).insert(0, name);
+         }
+         return builder.toString();
+     }
+ 
+     /**
+      * Holder for a snapshot in time of some processor properties that are
+      * passed around.
+      */
+     protected static class ProcessorConfiguration {
+ 
+         final private Path configuredRootDirPath;
+         final private Pattern fileFilterPattern;
+         final private boolean ignoreDottedFiles;
+         final private boolean filterMatchBasenameOnly;
+         final private long minimumAge;
+         final private long maximumAge;
+         final private boolean recurseSubdirs;
+         final private PathFilter pathFilter;
+ 
+         ProcessorConfiguration(final ProcessContext context) {
+             configuredRootDirPath = new 
Path(context.getProperty(DIRECTORY).getValue());
+             ignoreDottedFiles = 
context.getProperty(IGNORE_DOTTED_FILES).asBoolean();
+             final String fileFilterRegex = 
context.getProperty(FILE_FILTER_REGEX).getValue();
+             fileFilterPattern = (fileFilterRegex == null) ? null : 
Pattern.compile(fileFilterRegex);
+             filterMatchBasenameOnly = 
context.getProperty(FILTER_MATCH_NAME_ONLY).asBoolean();
+             final Long minAgeProp = 
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+             minimumAge = (minAgeProp == null) ? 0L : minAgeProp;
+             final Long maxAgeProp = 
context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+             maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
+             recurseSubdirs = context.getProperty(RECURSE_SUBDIRS).asBoolean();
+             pathFilter = new PathFilter() {
+ 
+                 @Override
+                 public boolean accept(Path path) {
+                     if (ignoreDottedFiles && path.getName().startsWith(".")) {
+                         return false;
+                     }
+                     final String pathToCompare;
+                     if (filterMatchBasenameOnly) {
+                         pathToCompare = path.getName();
+                     } else {
+                         // figure out portion of path that does not include 
the provided root dir.
+                         String relativePath = 
getPathDifference(configuredRootDirPath, path);
+                         if (relativePath.length() == 0) {
+                             pathToCompare = path.getName();
+                         } else {
+                             pathToCompare = relativePath + Path.SEPARATOR + 
path.getName();
+                         }
+                     }
+ 
+                     if (fileFilterPattern != null && 
!fileFilterPattern.matcher(pathToCompare).matches()) {
+                         return false;
+                     }
+                     return true;
+                 }
+ 
+             };
+         }
+ 
+         public Path getConfiguredRootDirPath() {
+             return configuredRootDirPath;
+         }
+ 
+         protected long getMinimumAge() {
+             return minimumAge;
+         }
+ 
+         protected long getMaximumAge() {
+             return maximumAge;
+         }
+ 
+         public boolean getRecurseSubdirs() {
+             return recurseSubdirs;
+         }
+ 
+         protected PathFilter getPathFilter() {
+             return pathFilter;
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java
index 0000000,5581430..ec8b5e6
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java
+++ 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFile.java
@@@ -1,0 -1,146 +1,145 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.hadoop;
+ 
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.List;
+ import java.util.Set;
+ import java.util.concurrent.TimeUnit;
+ 
++import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.DataUnit;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.Tags;
 -import org.apache.nifi.processor.annotation.TriggerWhenEmpty;
+ import org.apache.nifi.processors.hadoop.util.SequenceFileReader;
+ import org.apache.nifi.util.StopWatch;
+ import org.apache.nifi.util.Tuple;
 -
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ 
+ /**
+  * This processor is used to pull files from HDFS. The files being pulled in
+  * MUST be SequenceFile formatted files. The processor creates a flow file for
+  * each key/value entry in the ingested SequenceFile. The created flow file's
+  * content depends on the value of the optional configuration property 
FlowFile
+  * Content. Currently, there are two choices: VALUE ONLY and KEY VALUE PAIR.
+  * With the prior, only the SequenceFile value element is written to the flow
+  * file contents. With the latter, the SequenceFile key and value are written 
to
+  * the flow file contents as serialized objects; the format is key length 
(int),
+  * key(String), value length(int), value(bytes). The default is VALUE ONLY.
+  * <p>
+  * NOTE: This processor loads the entire value entry into memory. While the 
size
+  * limit for a value entry is 2GB, this will cause memory problems if there 
are
+  * too many concurrent tasks and the data being ingested is large.
+  *
+  */
+ @TriggerWhenEmpty
+ @Tags({"hadoop", "HDFS", "get", "fetch", "ingest", "source", "sequence file"})
+ @CapabilityDescription("Fetch sequence files from Hadoop Distributed File 
System (HDFS) into FlowFiles")
+ public class GetHDFSSequenceFile extends GetHDFS {
+ 
+     static final String VALUE_ONLY = "VALUE ONLY";
+ 
+     static final PropertyDescriptor FLOWFILE_CONTENT = new 
PropertyDescriptor.Builder()
+             .name("FlowFile Content")
+             .description("Indicate if the content is to be both the key and 
value of the Sequence File, or just the value.")
+             .allowableValues(VALUE_ONLY, "KEY VALUE PAIR")
+             .defaultValue(VALUE_ONLY)
+             .required(true)
+             .build();
+ 
+     static final List<PropertyDescriptor> props;
+ 
+     static {
+         List<PropertyDescriptor> someProps = new ArrayList<>(localProperties);
+         someProps.add(FLOWFILE_CONTENT);
+         props = Collections.unmodifiableList(someProps);
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return props;
+     }
+ 
+     @Override
+     protected void processBatchOfFiles(final List<Path> files, final 
ProcessContext context, final ProcessSession session) {
+         final Tuple<Configuration, FileSystem> hadoopResources = 
hdfsResources.get();
+         final Configuration conf = hadoopResources.getKey();
+         final FileSystem hdfs = hadoopResources.getValue();
+         final String flowFileContentValue = 
context.getProperty(FLOWFILE_CONTENT).getValue();
+         final boolean keepSourceFiles = 
context.getProperty(KEEP_SOURCE_FILE).asBoolean();
+         final Double bufferSizeProp = 
context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
+         if (bufferSizeProp != null) {
+             int bufferSize = bufferSizeProp.intValue();
+             conf.setInt(BUFFER_SIZE_KEY, bufferSize);
+         }
+         ProcessorLog logger = getLogger();
+         final SequenceFileReader<Set<FlowFile>> reader;
+         if (flowFileContentValue.equalsIgnoreCase(VALUE_ONLY)) {
+             reader = new ValueReader(session);
+         } else {
+             reader = new KeyValueReader(session);
+         }
+         Set<FlowFile> flowFiles = Collections.emptySet();
+         for (final Path file : files) {
+             if (!this.isScheduled()) {
+                 break; // This processor should stop running immediately.
+             }
+ 
+             final StopWatch stopWatch = new StopWatch(false);
+             try {
+                 stopWatch.start();
+                 if (!hdfs.exists(file)) {
+                     continue; // If file is no longer here move on.
+                 }
+                 logger.debug("Reading file");
+                 flowFiles = reader.readSequenceFile(file, conf, hdfs);
+                 if (!keepSourceFiles && !hdfs.delete(file, false)) {
+                     logger.warn("Unable to delete path " + file.toString() + 
" from HDFS.  Will likely be picked up over and over...");
+                 }
+             } catch (Throwable t) {
+                 logger.error("Error retrieving file {} from HDFS due to {}", 
new Object[]{file, t});
+                 session.rollback();
+                 context.yield();
+             } finally {
+                 stopWatch.stop();
+                 long totalSize = 0;
+                 for (FlowFile flowFile : flowFiles) {
+                     totalSize += flowFile.getSize();
+                     session.getProvenanceReporter().receive(flowFile, 
file.toString());
+                 }
+                 if (totalSize > 0) {
+                     final String dataRate = 
stopWatch.calculateDataRate(totalSize);
+                     final long millis = 
stopWatch.getDuration(TimeUnit.MILLISECONDS);
+                     logger.info("Created {} flowFiles from SequenceFile {}. 
Ingested in {} milliseconds at a rate of {}", new Object[]{
+                         flowFiles.size(), file.toUri().toASCIIString(), 
millis, dataRate});
+                     logger.info("Transferred flowFiles {}  to success", new 
Object[]{flowFiles});
+                     session.transfer(flowFiles, REL_SUCCESS);
+                 }
+             }
+         }
+ 
+     }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 0000000,e84b575..9a5aa74
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ 
b/nifi/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@@ -1,0 -1,403 +1,403 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.hadoop;
+ 
+ import java.io.FileNotFoundException;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Set;
+ import java.util.concurrent.TimeUnit;
+ 
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.documentation.Tags;
++import org.apache.nifi.annotation.lifecycle.OnScheduled;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.PropertyValue;
+ import org.apache.nifi.components.ValidationContext;
+ import org.apache.nifi.components.ValidationResult;
+ import org.apache.nifi.components.Validator;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.stream.io.BufferedInputStream;
+ import org.apache.nifi.stream.io.StreamUtils;
+ import org.apache.nifi.processor.DataUnit;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.OnScheduled;
 -import org.apache.nifi.processor.annotation.Tags;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.io.InputStreamCallback;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.util.StopWatch;
+ import org.apache.nifi.util.Tuple;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FSDataOutputStream;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.fs.permission.FsPermission;
+ import org.apache.hadoop.ipc.RemoteException;
+ 
+ /**
+  * This processor copies FlowFiles to HDFS.
+  */
+ @Tags({"hadoop", "HDFS", "put", "copy", "filesystem"})
+ @CapabilityDescription("Write FlowFile data to Hadoop Distributed File System 
(HDFS)")
+ public class PutHDFS extends AbstractHadoopProcessor {
+ 
+     public static final String REPLACE_RESOLUTION = "replace";
+     public static final String IGNORE_RESOLUTION = "ignore";
+     public static final String FAIL_RESOLUTION = "fail";
+ 
+     public static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
+     public static final int BUFFER_SIZE_DEFAULT = 4096;
+ 
+     // relationships
+     public static final Relationship REL_SUCCESS = new Relationship.Builder()
+             .name("success")
+             .description("Files that have been successfully written to HDFS 
are transferred to this relationship")
+             .build();
+ 
+     public static final Relationship REL_FAILURE = new Relationship.Builder()
+             .name("failure")
+             .description(
+                     "Files that could not be written to HDFS for some reason 
are transferred to this relationship")
+             .build();
+ 
+     // properties
+     public static final PropertyDescriptor DIRECTORY = new 
PropertyDescriptor.Builder()
+             .name(DIRECTORY_PROP_NAME)
+             .description("The parent HDFS directory to which files should be 
written")
+             .required(true)
+             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+             .expressionLanguageSupported(true)
+             .build();
+ 
+     public static final PropertyDescriptor CONFLICT_RESOLUTION = new 
PropertyDescriptor.Builder()
+             .name("Conflict Resolution Strategy")
+             .description("Indicates what should happen when a file with the 
same name already exists in the output directory")
+             .required(true)
+             .defaultValue(FAIL_RESOLUTION)
+             .allowableValues(REPLACE_RESOLUTION, IGNORE_RESOLUTION, 
FAIL_RESOLUTION)
+             .build();
+ 
+     public static final PropertyDescriptor BLOCK_SIZE = new 
PropertyDescriptor.Builder()
+             .name("Block Size")
+             .description("Size of each block as written to HDFS. This 
overrides the Hadoop Configuration")
+             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+             .build();
+ 
+     public static final PropertyDescriptor BUFFER_SIZE = new 
PropertyDescriptor.Builder()
+             .name("IO Buffer Size")
+             .description("Amount of memory to use to buffer file contents 
during IO. This overrides the Hadoop Configuration")
+             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+             .build();
+ 
+     public static final PropertyDescriptor REPLICATION_FACTOR = new 
PropertyDescriptor.Builder()
+             .name("Replication")
+             .description("Number of times that HDFS will replicate each file. 
This overrides the Hadoop Configuration")
+             .addValidator(createPositiveShortValidator())
+             .build();
+ 
+     public static final PropertyDescriptor UMASK = new 
PropertyDescriptor.Builder()
+             .name("Permissions umask")
+             .description(
+                     "A umask represented as an octal number which determines 
the permissions of files written to HDFS. This overrides the Hadoop 
Configuration dfs.umaskmode")
+             .addValidator(createUmaskValidator())
+             .build();
+ 
+     public static final PropertyDescriptor REMOTE_OWNER = new 
PropertyDescriptor.Builder()
+             .name("Remote Owner")
+             .description(
+                     "Changes the owner of the HDFS file to this value after 
it is written. This only works if NiFi is running as a user that has HDFS super 
user privilege to change owner")
+             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+             .build();
+ 
+     public static final PropertyDescriptor REMOTE_GROUP = new 
PropertyDescriptor.Builder()
+             .name("Remote Group")
+             .description(
+                     "Changes the group of the HDFS file to this value after 
it is written. This only works if NiFi is running as a user that has HDFS super 
user privilege to change group")
+             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+             .build();
+ 
+     private static final Set<Relationship> relationships;
+     private static final List<PropertyDescriptor> localProperties;
+ 
+     static {
+         final Set<Relationship> rels = new HashSet<>();
+         rels.add(REL_SUCCESS);
+         rels.add(REL_FAILURE);
+         relationships = Collections.unmodifiableSet(rels);
+ 
+         List<PropertyDescriptor> props = new ArrayList<>(properties);
+         props.add(DIRECTORY);
+         props.add(CONFLICT_RESOLUTION);
+         props.add(BLOCK_SIZE);
+         props.add(BUFFER_SIZE);
+         props.add(REPLICATION_FACTOR);
+         props.add(UMASK);
+         props.add(REMOTE_OWNER);
+         props.add(REMOTE_GROUP);
+         localProperties = Collections.unmodifiableList(props);
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return localProperties;
+     }
+ 
+     @OnScheduled
+     public void onScheduled(ProcessContext context) throws Exception {
+         super.abstractOnScheduled(context);
+ 
+         // Set umask once, to avoid thread safety issues doing it in onTrigger
+         final PropertyValue umaskProp = context.getProperty(UMASK);
+         final short dfsUmask;
+         if (umaskProp.isSet()) {
+             dfsUmask = Short.parseShort(umaskProp.getValue(), 8);
+         } else {
+             dfsUmask = FsPermission.DEFAULT_UMASK;
+         }
+         final Tuple<Configuration, FileSystem> resources = 
hdfsResources.get();
+         final Configuration conf = resources.getKey();
+         FsPermission.setUMask(conf, new FsPermission(dfsUmask));
+     }
+ 
+     @Override
+     public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+         FlowFile flowFile = session.get();
+         if (flowFile == null) {
+             return;
+         }
+ 
+         final Tuple<Configuration, FileSystem> resources = 
hdfsResources.get();
+         if (resources == null || resources.getKey() == null || 
resources.getValue() == null) {
+             getLogger().error("HDFS not configured properly");
+             session.transfer(flowFile, REL_FAILURE);
+             context.yield();
+             return;
+         }
+         final Configuration conf = resources.getKey();
+         final FileSystem hdfs = resources.getValue();
+ 
+         final Path configuredRootDirPath = new 
Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile)
+                 .getValue());
+         final String conflictResponse = 
context.getProperty(CONFLICT_RESOLUTION).getValue();
+ 
+         final Double blockSizeProp = 
context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B);
+         final long blockSize = blockSizeProp != null ? 
blockSizeProp.longValue() : hdfs.getDefaultBlockSize(configuredRootDirPath);
+ 
+         final Double bufferSizeProp = 
context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
+         final int bufferSize = bufferSizeProp != null ? 
bufferSizeProp.intValue() : conf.getInt(BUFFER_SIZE_KEY,
+                 BUFFER_SIZE_DEFAULT);
+ 
+         final Integer replicationProp = 
context.getProperty(REPLICATION_FACTOR).asInteger();
+         final short replication = replicationProp != null ? 
replicationProp.shortValue() : hdfs
+                 .getDefaultReplication(configuredRootDirPath);
+ 
+         Path tempDotCopyFile = null;
+         try {
+             final Path tempCopyFile;
+             final Path copyFile;
+ 
+             tempCopyFile = new Path(configuredRootDirPath, "." + 
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+             copyFile = new Path(configuredRootDirPath, 
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+ 
+             // Create destination directory if it does not exist
+             try {
+                 if (!hdfs.getFileStatus(configuredRootDirPath).isDir()) {
+                     throw new IOException(configuredRootDirPath.toString() + 
" already exists and is not a directory");
+                 }
+             } catch (FileNotFoundException fe) {
+                 if (!hdfs.mkdirs(configuredRootDirPath)) {
+                     throw new IOException(configuredRootDirPath.toString() + 
" could not be created");
+                 }
+                 changeOwner(context, hdfs, configuredRootDirPath);
+             }
+ 
+             // If destination file already exists, resolve that based on 
processor configuration
+             if (hdfs.exists(copyFile)) {
+                 switch (conflictResponse) {
+                     case REPLACE_RESOLUTION:
+                         if (hdfs.delete(copyFile, false)) {
+                             getLogger().info("deleted {} in order to replace 
with the contents of {}",
+                                     new Object[]{copyFile, flowFile});
+                         }
+                         break;
+                     case IGNORE_RESOLUTION:
+                         session.transfer(flowFile, REL_SUCCESS);
+                         getLogger().info("transferring {} to success because 
file with same name already exists",
+                                 new Object[]{flowFile});
+                         return;
+                     case FAIL_RESOLUTION:
+                         flowFile = session.penalize(flowFile);
+                         session.transfer(flowFile, REL_FAILURE);
+                         getLogger().warn("penalizing {} and routing to 
failure because file with same name already exists",
+                                 new Object[]{flowFile});
+                         return;
+                     default:
+                         break;
+                 }
+             }
+ 
+             // Write FlowFile to temp file on HDFS
+             final StopWatch stopWatch = new StopWatch(true);
+             session.read(flowFile, new InputStreamCallback() {
+ 
+                 @Override
+                 public void process(InputStream in) throws IOException {
+                     FSDataOutputStream fos = null;
+                     Path createdFile = null;
+                     try {
+                         fos = hdfs.create(tempCopyFile, true, bufferSize, 
replication, blockSize);
+                         createdFile = tempCopyFile;
+                         BufferedInputStream bis = new BufferedInputStream(in);
+                         StreamUtils.copy(bis, fos);
+                         bis = null;
+                         fos.flush();
+                     } finally {
+                         try {
+                             if (fos != null) {
+                                 fos.close();
+                             }
+                         } catch (RemoteException re) {
+                             // when talking to remote HDFS clusters, we don't 
notice problems until fos.close()
+                             if (createdFile != null) {
+                                 try {
+                                     hdfs.delete(createdFile, false);
+                                 } catch (Throwable ignore) {
+                                 }
+                             }
+                             throw re;
+                         } catch (Throwable ignore) {
+                         }
+                         fos = null;
+                     }
+                 }
+ 
+             });
+             stopWatch.stop();
+             final String dataRate = 
stopWatch.calculateDataRate(flowFile.getSize());
+             final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+             tempDotCopyFile = tempCopyFile;
+ 
+             boolean renamed = false;
+             for (int i = 0; i < 10; i++) { // try to rename multiple times.
+                 if (hdfs.rename(tempCopyFile, copyFile)) {
+                     renamed = true;
+                     break;// rename was successful
+                 }
+                 Thread.sleep(200L);// try waiting to let whatever might cause 
rename failure to resolve
+             }
+             if (!renamed) {
+                 hdfs.delete(tempCopyFile, false);
+                 throw new ProcessException("Copied file to HDFS but could not 
rename dot file " + tempCopyFile
+                         + " to its final filename");
+             }
+ 
+             changeOwner(context, hdfs, copyFile);
+ 
+             getLogger().info("copied {} to HDFS at {} in {} milliseconds at a 
rate of {}",
+                     new Object[]{flowFile, copyFile, millis, dataRate});
+ 
+             final String filename = copyFile.toString();
+             final String transitUri = (filename.startsWith("/")) ? "hdfs:/" + 
filename : "hdfs://" + filename;
+             session.getProvenanceReporter().send(flowFile, transitUri);
+             session.transfer(flowFile, REL_SUCCESS);
+ 
+         } catch (final Throwable t) {
+             if (tempDotCopyFile != null) {
+                 try {
+                     hdfs.delete(tempDotCopyFile, false);
+                 } catch (Exception e) {
+                     getLogger().error("Unable to remove temporary file {} due 
to {}", new Object[]{tempDotCopyFile, e});
+                 }
+             }
+             getLogger().error("Failed to write to HDFS due to {}", t);
+             session.rollback();
+             context.yield();
+         }
+     }
+ 
+     protected void changeOwner(final ProcessContext context, final FileSystem 
hdfs, final Path name) {
+         try {
+             // Change owner and group of file if configured to do so
+             String owner = context.getProperty(REMOTE_OWNER).getValue();
+             String group = context.getProperty(REMOTE_GROUP).getValue();
+             if (owner != null || group != null) {
+                 hdfs.setOwner(name, owner, group);
+             }
+         } catch (Exception e) {
+             getLogger().warn("Could not change owner or group of {} on HDFS 
due to {}", new Object[]{name, e});
+         }
+     }
+ 
+     /*
+      * Validates that a property is a valid short number greater than 0.
+      */
+     static Validator createPositiveShortValidator() {
+         return new Validator() {
+             @Override
+             public ValidationResult validate(final String subject, final 
String value, final ValidationContext context) {
+                 String reason = null;
+                 try {
+                     final short shortVal = Short.parseShort(value);
+                     if (shortVal <= 0) {
+                         reason = "short integer must be greater than zero";
+                     }
+                 } catch (final NumberFormatException e) {
+                     reason = "[" + value + "] is not a valid short integer";
+                 }
+                 return new 
ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason
 == null)
+                         .build();
+             }
+         };
+     }
+ 
+     /*
+      * Validates that a property is a valid umask, i.e. a short octal number 
that is not negative.
+      */
+     static Validator createUmaskValidator() {
+         return new Validator() {
+             @Override
+             public ValidationResult validate(final String subject, final 
String value, final ValidationContext context) {
+                 String reason = null;
+                 try {
+                     final short shortVal = Short.parseShort(value, 8);
+                     if (shortVal < 0) {
+                         reason = "octal umask [" + value + "] cannot be 
negative";
+                     } else if (shortVal > 511) {
+                         // HDFS umask has 9 bits: rwxrwxrwx ; the sticky bit 
cannot be umasked
+                         reason = "octal umask [" + value + "] is not a valid 
umask";
+                     }
+                 } catch (final NumberFormatException e) {
+                     reason = "[" + value + "] is not a valid short octal 
number";
+                 }
+                 return new 
ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason
 == null)
+                         .build();
+             }
+         };
+     }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
index 0000000,f202e29..5383e9d
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
+++ 
b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
@@@ -1,0 -1,330 +1,330 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.kafka;
+ 
+ import java.io.IOException;
+ import java.io.OutputStream;
+ import java.nio.charset.StandardCharsets;
+ import java.util.ArrayList;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Properties;
+ import java.util.Set;
+ import java.util.concurrent.BlockingQueue;
+ import java.util.concurrent.LinkedBlockingQueue;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.locks.Lock;
+ import java.util.concurrent.locks.ReentrantLock;
+ 
+ import kafka.consumer.Consumer;
+ import kafka.consumer.ConsumerConfig;
+ import kafka.consumer.ConsumerIterator;
+ import kafka.consumer.KafkaStream;
+ import kafka.javaapi.consumer.ConsumerConnector;
+ import kafka.message.MessageAndMetadata;
+ 
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.documentation.Tags;
++import org.apache.nifi.annotation.lifecycle.OnScheduled;
++import org.apache.nifi.annotation.lifecycle.OnStopped;
++import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.Validator;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.OnScheduled;
 -import org.apache.nifi.processor.annotation.OnStopped;
 -import org.apache.nifi.processor.annotation.OnUnscheduled;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.io.OutputStreamCallback;
+ import org.apache.nifi.processor.util.StandardValidators;
+ 
+ @SupportsBatching
+ @CapabilityDescription("Fetches messages from Apache Kafka")
+ @Tags({"Kafka", "Apache", "Get", "Ingest", "Ingress", "Topic", "PubSub"})
+ public class GetKafka extends AbstractProcessor {
+     public static final PropertyDescriptor ZOOKEEPER_CONNECTION_STRING = new 
PropertyDescriptor.Builder()
+         .name("ZooKeeper Connection String")
+         .description("The Connection String to use in order to connect to 
ZooKeeper. This is often a comma-separated list of <host>:<port> combinations. 
For example, host1:2181,host2:2181,host3:2188")
+         .required(true)
+         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+         .expressionLanguageSupported(false)
+         .build();
+     public static final PropertyDescriptor TOPIC = new 
PropertyDescriptor.Builder()
+         .name("Topic Name")
+         .description("The Kafka Topic to pull messages from")
+         .required(true)
+         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+         .expressionLanguageSupported(false)
+         .build();
+     public static final PropertyDescriptor ZOOKEEPER_COMMIT_DELAY = new 
PropertyDescriptor.Builder()
+               .name("Zookeeper Commit Frequency")
+               .description("Specifies how often to communicate with ZooKeeper 
to indicate which messages have been pulled. A longer time period will result 
in better overall performance but can result in more data duplication if a NiFi 
node is lost")
+               .required(true)
+               .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+               .expressionLanguageSupported(false)
+               .defaultValue("60 secs")
+               .build();
+     public static final PropertyDescriptor ZOOKEEPER_TIMEOUT = new 
PropertyDescriptor.Builder()
+           .name("ZooKeeper Communications Timeout")
+           .description("The amount of time to wait for a response from 
ZooKeeper before determining that there is a communications error")
+           .required(true)
+           .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+           .expressionLanguageSupported(false)
+           .defaultValue("30 secs")
+           .build();
+     public static final PropertyDescriptor KAFKA_TIMEOUT = new 
PropertyDescriptor.Builder()
+           .name("Kafka Communications Timeout")
+           .description("The amount of time to wait for a response from Kafka 
before determining that there is a communications error")
+           .required(true)
+           .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+           .expressionLanguageSupported(false)
+           .defaultValue("30 secs")
+           .build();
+     public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+         .name("Batch Size")
+         .description("Specifies the maximum number of messages to combine 
into a single FlowFile. These messages will be "
+                 + "concatenated together with the <Message Demarcator> string 
placed between the content of each message. "
+                 + "If the messages from Kafka should not be concatenated 
together, leave this value at 1.")
+         .required(true)
+         .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+         .expressionLanguageSupported(false)
+         .defaultValue("1")
+         .build();
+     public static final PropertyDescriptor MESSAGE_DEMARCATOR = new 
PropertyDescriptor.Builder()
+         .name("Message Demarcator")
+         .description("Specifies the characters to use in order to demarcate 
multiple messages from Kafka. If the <Batch Size> "
+                 + "property is set to 1, this value is ignored. Otherwise, 
for each two subsequent messages in the batch, "
+                 + "this value will be placed in between them.")
+         .required(true)
+         .addValidator(Validator.VALID)  // accept anything as a demarcator, 
including empty string
+         .expressionLanguageSupported(false)
+         .defaultValue("\\n")
+         .build();
+     public static final PropertyDescriptor CLIENT_NAME = new 
PropertyDescriptor.Builder()
+         .name("Client Name")
+         .description("Client Name to use when communicating with Kafka")
+         .required(true)
+         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+         .expressionLanguageSupported(false)
+         .build();
+ 
+     public static final Relationship REL_SUCCESS = new Relationship.Builder()
+           .name("success")
+           .description("All FlowFiles that are created are routed to this 
relationship")
+           .build();
+ 
+     
+     private final BlockingQueue<ConsumerIterator<byte[], byte[]>> 
streamIterators = new LinkedBlockingQueue<>();
+     private volatile ConsumerConnector consumer;
+ 
+     final Lock interruptionLock = new ReentrantLock();
+     // guarded by interruptionLock
+     private final Set<Thread> interruptableThreads = new HashSet<>();
+     
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+       final PropertyDescriptor clientNameWithDefault = new 
PropertyDescriptor.Builder()
+               .fromPropertyDescriptor(CLIENT_NAME)
+               .defaultValue("NiFi-" + getIdentifier())
+               .build();
+       
+         final List<PropertyDescriptor> props = new ArrayList<>();
+         props.add(ZOOKEEPER_CONNECTION_STRING);
+         props.add(TOPIC);
+         props.add(ZOOKEEPER_COMMIT_DELAY);
+         props.add(BATCH_SIZE);
+         props.add(MESSAGE_DEMARCATOR);
+         props.add(clientNameWithDefault);
+         props.add(KAFKA_TIMEOUT);
+         props.add(ZOOKEEPER_TIMEOUT);
+         return props;
+     }
+     
+     @Override
+     public Set<Relationship> getRelationships() {
+         final Set<Relationship> relationships = new HashSet<>(1);
+         relationships.add(REL_SUCCESS);
+         return relationships;
+     }
+     
+     @OnScheduled
+     public void createConsumers(final ProcessContext context) {
+       final String topic = context.getProperty(TOPIC).getValue();
+       
+       final Map<String, Integer> topicCountMap = new HashMap<>(1);
+       topicCountMap.put(topic, context.getMaxConcurrentTasks());
+       
+       final Properties props = new Properties();
+       props.setProperty("zookeeper.connect", 
context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue()); 
+       props.setProperty("group.id", getIdentifier());
+       props.setProperty("auto.commit.interval.ms", 
String.valueOf(context.getProperty(ZOOKEEPER_COMMIT_DELAY).asTimePeriod(TimeUnit.MILLISECONDS)));
+       props.setProperty("auto.commit.enable", "true"); // just be explicit
+       props.setProperty("auto.offset.reset", "smallest");
+       
+       final ConsumerConfig consumerConfig = new ConsumerConfig(props);
+       consumer = Consumer.createJavaConsumerConnector(consumerConfig);
+       
+       final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
consumer.createMessageStreams(topicCountMap);
+       final List<KafkaStream<byte[], byte[]>> streams = 
consumerMap.get(topic);
+       
+       this.streamIterators.clear();
+       
+       for ( final KafkaStream<byte[], byte[]> stream : streams ) {
+               streamIterators.add(stream.iterator());
+       }
+     }
+     
+     @OnStopped
+     public void shutdownConsumer() {
+       if ( consumer != null ) {
+               try {
+                       consumer.commitOffsets();
+               } finally {
+                       consumer.shutdown();
+               }
+       }
+     }
+     
+     @OnUnscheduled
+     public void interruptIterators() {
+       // Kafka doesn't provide a non-blocking API for pulling messages. We 
can, however,
+       // interrupt the Threads. We do this when the Processor is stopped so 
that we have the
+       // ability to shutdown the Processor.
+       interruptionLock.lock();
+       try {
+               for ( final Thread t : interruptableThreads ) {
+                       t.interrupt();
+               }
+               
+               interruptableThreads.clear();
+       } finally {
+               interruptionLock.unlock();
+       }
+     }
+     
+     protected ConsumerIterator<byte[], byte[]> getStreamIterator() {
+         return streamIterators.poll();
+     }
+     
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+       ConsumerIterator<byte[], byte[]> iterator = getStreamIterator();
+       if ( iterator == null ) {
+               return;
+       }
+       
+       final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+       final String demarcator = 
context.getProperty(MESSAGE_DEMARCATOR).getValue().replace("\\n", 
"\n").replace("\\r", "\r").replace("\\t", "\t");
+       final byte[] demarcatorBytes = 
demarcator.getBytes(StandardCharsets.UTF_8);
+       final String topic = context.getProperty(TOPIC).getValue();
+       
+       FlowFile flowFile = null;
+       try {
+           // add the current thread to the Set of those to be interrupted if 
processor stopped.
+               interruptionLock.lock();
+               try {
+                       interruptableThreads.add(Thread.currentThread());
+               } finally {
+                       interruptionLock.unlock();
+               }
+               
+               final long start = System.nanoTime();
+               flowFile = session.create();
+               
+               final Map<String, String> attributes = new HashMap<>();
+             attributes.put("kafka.topic", topic);
+ 
+             int numMessages = 0;
+               for (int msgCount = 0; msgCount < batchSize; msgCount++) {
+                   // if the processor is stopped, iterator.hasNext() will 
throw an Exception.
+                   // In this case, we just break out of the loop.
+                   try {
+                           if ( !iterator.hasNext() ) {
+                               break;
+                           }
+                   } catch (final Exception e) {
+                       break;
+                   }
+                   
+                       final MessageAndMetadata<byte[], byte[]> mam = 
iterator.next();
+                       if ( mam == null ) {
+                               return;
+                       }
+                       
+                       final byte[] key = mam.key();
+                       
+                       if ( batchSize == 1 ) {
+                           // the kafka.key, kafka.offset, and kafka.partition 
attributes are added only
+                           // for a batch size of 1.
+                           if ( key != null ) {
+                               attributes.put("kafka.key", new String(key, 
StandardCharsets.UTF_8));
+                           }
+                           
+                       attributes.put("kafka.offset", 
String.valueOf(mam.offset()));
+                       attributes.put("kafka.partition", 
String.valueOf(mam.partition()));
+                       }
+                       
+                       // add the message to the FlowFile's contents
+                       final boolean firstMessage = (msgCount == 0);
+                       flowFile = session.append(flowFile, new 
OutputStreamCallback() {
+                               @Override
+                               public void process(final OutputStream out) 
throws IOException {
+                                   if ( !firstMessage ) {
+                                       out.write(demarcatorBytes);
+                                   }
+                                       out.write(mam.message());
+                               }
+                       });
+                       numMessages++;
+               }
+               
+               // If we received no messages, remove the FlowFile. Otherwise, 
send to success.
+               if ( flowFile.getSize() == 0L ) {
+                   session.remove(flowFile);
+               } else {
+                       flowFile = session.putAllAttributes(flowFile, 
attributes);
+                       final long millis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+                       session.getProvenanceReporter().receive(flowFile, 
"kafka://" + topic, "Received " + numMessages + " Kafka messages", millis);
+                       getLogger().info("Successfully received {} from Kafka 
with {} messages in {} millis", new Object[] {flowFile, numMessages, millis});
+                       session.transfer(flowFile, REL_SUCCESS);
+               }
+       } catch (final Exception e) {
+               getLogger().error("Failed to receive FlowFile from Kafka due to 
{}", new Object[] {e});
+               if ( flowFile != null ) {
+                       session.remove(flowFile);
+               }
+       } finally {
+           // Remove the current thread from the Set of Threads to interrupt.
+               interruptionLock.lock();
+               try {
+                       interruptableThreads.remove(Thread.currentThread());
+               } finally {
+                       interruptionLock.unlock();
+               }
+               
+               // Add the iterator back to the queue
+               if ( iterator != null ) {
+                       streamIterators.offer(iterator);
+               }
+       }
+     }
+       
+ }

Reply via email to