NIFI-3366 This closes #2332. Added parent/child flowfile relationship between 
the incoming flowfile and the files that are moved from the input directory to 
the output directory.
Updated to allow tests to check for evaluation of properties that support 
expression language.
Fixed bug with changeOwner attempting to operate on original file rather than 
the moved/copied file.
Added license header to MoveHDFSTest.java
Added test for moving a directory of files that contains a subdir, ensuring 
non-recursive behavior
Added to the description of the processor that it is non-recursive when a 
directory is used as input.
Added RAT exclude for test resource .dotfile to pom.xml

Signed-off-by: joewitt <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/600586d6
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/600586d6
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/600586d6

Branch: refs/heads/master
Commit: 600586d6be1ee2b7f8fb9397623a2bc8d72df1b7
Parents: 3731fbe
Author: Jeff Storck <[email protected]>
Authored: Thu Dec 7 15:39:22 2017 -0500
Committer: joewitt <[email protected]>
Committed: Mon Dec 11 08:41:36 2017 -0500

----------------------------------------------------------------------
 .../nifi-hdfs-processors/pom.xml                |  15 +
 .../apache/nifi/processors/hadoop/MoveHDFS.java | 880 +++++++++----------
 .../nifi/processors/hadoop/MoveHDFSTest.java    | 408 +++++----
 .../src/test/resources/testdata/.dotfile        |   0
 4 files changed, 685 insertions(+), 618 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/600586d6/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
index e6b8b91..32576e1 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
@@ -66,4 +66,19 @@
             <artifactId>nifi-properties</artifactId>
         </dependency>
     </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes combine.children="append">
+                        <exclude>src/test/resources/testdata/.dotfile</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/600586d6/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
index e9842b7..5c889d6 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
@@ -32,6 +32,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -60,145 +61,141 @@ import org.apache.nifi.util.StopWatch;
 /**
  * This processor renames files on HDFS.
  */
-@Tags({ "hadoop", "HDFS", "put", "move", "filesystem", "restricted", 
"moveHDFS" })
-@CapabilityDescription("Rename existing files on Hadoop Distributed File 
System (HDFS)")
+@Tags({"hadoop", "HDFS", "put", "move", "filesystem", "restricted", 
"moveHDFS"})
+@CapabilityDescription("Rename existing files or a directory of files 
(non-recursive) on Hadoop Distributed File System (HDFS).")
 @ReadsAttribute(attribute = "filename", description = "The name of the file 
written to HDFS comes from the value of this attribute.")
 @WritesAttributes({
-               @WritesAttribute(attribute = "filename", description = "The 
name of the file written to HDFS is stored in this attribute."),
-               @WritesAttribute(attribute = "absolute.hdfs.path", description 
= "The absolute path to the file on HDFS is stored in this attribute.") })
-@SeeAlso({ PutHDFS.class, GetHDFS.class })
+        @WritesAttribute(attribute = "filename", description = "The name of 
the file written to HDFS is stored in this attribute."),
+        @WritesAttribute(attribute = "absolute.hdfs.path", description = "The 
absolute path to the file on HDFS is stored in this attribute.")})
+@SeeAlso({PutHDFS.class, GetHDFS.class})
 public class MoveHDFS extends AbstractHadoopProcessor {
 
-       // static global
-       public static final int MAX_WORKING_QUEUE_SIZE = 25000;
-       public static final String REPLACE_RESOLUTION = "replace";
-       public static final String IGNORE_RESOLUTION = "ignore";
-       public static final String FAIL_RESOLUTION = "fail";
-
-       private static final Set<Relationship> relationships;
-
-       public static final AllowableValue REPLACE_RESOLUTION_AV = new 
AllowableValue(REPLACE_RESOLUTION,
-                       REPLACE_RESOLUTION, "Replaces the existing file if 
any.");
-       public static final AllowableValue IGNORE_RESOLUTION_AV = new 
AllowableValue(IGNORE_RESOLUTION, IGNORE_RESOLUTION,
-                       "Failed rename operation stops processing and routes to 
success.");
-       public static final AllowableValue FAIL_RESOLUTION_AV = new 
AllowableValue(FAIL_RESOLUTION, FAIL_RESOLUTION,
-                       "Failing to rename a file routes to failure.");
-
-       public static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
-       public static final int BUFFER_SIZE_DEFAULT = 4096;
-
-       public static final String ABSOLUTE_HDFS_PATH_ATTRIBUTE = 
"absolute.hdfs.path";
-
-       // relationships
-       public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
-                       .description("Files that have been successfully renamed 
on HDFS are transferred to this relationship")
-                       .build();
-
-       public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
-                       .description("Files that could not be renamed on HDFS 
are transferred to this relationship").build();
-
-       // properties
-       public static final PropertyDescriptor CONFLICT_RESOLUTION = new 
PropertyDescriptor.Builder()
-                       .name("Conflict Resolution Strategy")
-                       .description(
-                                       "Indicates what should happen when a 
file with the same name already exists in the output directory")
-                       
.required(true).defaultValue(FAIL_RESOLUTION_AV.getValue())
-                       .allowableValues(REPLACE_RESOLUTION_AV, 
IGNORE_RESOLUTION_AV, FAIL_RESOLUTION_AV).build();
-
-       public static final PropertyDescriptor FILE_FILTER_REGEX = new 
PropertyDescriptor.Builder()
-                       .name("File Filter Regex")
-                       .description(
-                                       "A Java Regular Expression for 
filtering Filenames; if a filter is supplied then only files whose names match 
that Regular "
-                                                       + "Expression will be 
fetched, otherwise all files will be fetched")
-                       
.required(false).addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
-
-       public static final PropertyDescriptor IGNORE_DOTTED_FILES = new 
PropertyDescriptor.Builder()
-                       .name("Ignore Dotted Files")
-                       .description("If true, files whose names begin with a 
dot (\".\") will be ignored").required(true)
-                       .allowableValues("true", 
"false").defaultValue("true").build();
-
-       public static final PropertyDescriptor INPUT_DIRECTORY_OR_FILE = new 
PropertyDescriptor.Builder()
-                       .name("Input Directory or File")
-                       .description("The HDFS directory from which files 
should be read, or a single file to read")
-                       
.defaultValue("${path}").addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
-                       .expressionLanguageSupported(true).build();
-
-       public static final PropertyDescriptor OUTPUT_DIRECTORY = new 
PropertyDescriptor.Builder().name("Output Directory")
-                       .description("The HDFS directory where the files will 
be moved to").required(true)
-                       
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR).expressionLanguageSupported(true)
-                       .build();
-
-       public static final PropertyDescriptor OPERATION = new 
PropertyDescriptor.Builder().name("HDFS Operation")
-                       .description("The operation that will be performed on 
the source file").required(true)
-                       .allowableValues("move", 
"copy").defaultValue("move").build();
-
-       public static final PropertyDescriptor REMOTE_OWNER = new 
PropertyDescriptor.Builder().name("Remote Owner")
-                       .description(
-                                       "Changes the owner of the HDFS file to 
this value after it is written. This only works if NiFi is running as a user 
that has HDFS super user privilege to change owner")
-                       
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
-
-       public static final PropertyDescriptor REMOTE_GROUP = new 
PropertyDescriptor.Builder().name("Remote Group")
-                       .description(
-                                       "Changes the group of the HDFS file to 
this value after it is written. This only works if NiFi is running as a user 
that has HDFS super user privilege to change group")
-                       
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
-
-       static {
-               final Set<Relationship> rels = new HashSet<>();
-               rels.add(REL_SUCCESS);
-               rels.add(REL_FAILURE);
-               relationships = Collections.unmodifiableSet(rels);
-       }
-
-       // non-static global
-       protected ProcessorConfiguration processorConfig;
-       private final AtomicLong logEmptyListing = new AtomicLong(2L);
-
-       private final Lock listingLock = new ReentrantLock();
-       private final Lock queueLock = new ReentrantLock();
-
-       private final BlockingQueue<Path> filePathQueue = new 
LinkedBlockingQueue<>();
-       private final BlockingQueue<Path> processing = new 
LinkedBlockingQueue<>();
-
-       // methods
-       @Override
-       public Set<Relationship> getRelationships() {
-               return relationships;
-       }
-
-       @Override
-       protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-               List<PropertyDescriptor> props = new ArrayList<>(properties);
-               props.add(CONFLICT_RESOLUTION);
-               props.add(INPUT_DIRECTORY_OR_FILE);
-               props.add(OUTPUT_DIRECTORY);
-               props.add(OPERATION);
-               props.add(FILE_FILTER_REGEX);
-               props.add(IGNORE_DOTTED_FILES);
-               props.add(REMOTE_OWNER);
-               props.add(REMOTE_GROUP);
-               return props;
-       }
-
-       @OnScheduled
-       public void onScheduled(ProcessContext context) throws Exception {
-               super.abstractOnScheduled(context);
-               // copy configuration values to pass them around cleanly
-               processorConfig = new ProcessorConfiguration(context);
-               // forget the state of the queue in case HDFS contents changed 
while
-               // this processor was turned off
-               queueLock.lock();
-               try {
-                       filePathQueue.clear();
-                       processing.clear();
-               } finally {
-                       queueLock.unlock();
-               }
-       }
-
-       @Override
-       public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
-               // MoveHDFS
-               FlowFile parentFlowFile = session.get();
+    // static global
+    public static final String REPLACE_RESOLUTION = "replace";
+    public static final String IGNORE_RESOLUTION = "ignore";
+    public static final String FAIL_RESOLUTION = "fail";
+
+    private static final Set<Relationship> relationships;
+
+    public static final AllowableValue REPLACE_RESOLUTION_AV = new 
AllowableValue(REPLACE_RESOLUTION,
+            REPLACE_RESOLUTION, "Replaces the existing file if any.");
+    public static final AllowableValue IGNORE_RESOLUTION_AV = new 
AllowableValue(IGNORE_RESOLUTION, IGNORE_RESOLUTION,
+            "Failed rename operation stops processing and routes to success.");
+    public static final AllowableValue FAIL_RESOLUTION_AV = new 
AllowableValue(FAIL_RESOLUTION, FAIL_RESOLUTION,
+            "Failing to rename a file routes to failure.");
+
+    public static final String ABSOLUTE_HDFS_PATH_ATTRIBUTE = 
"absolute.hdfs.path";
+
+    // relationships
+    public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+            .description("Files that have been successfully renamed on HDFS 
are transferred to this relationship")
+            .build();
+
+    public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+            .description("Files that could not be renamed on HDFS are 
transferred to this relationship").build();
+
+    // properties
+    public static final PropertyDescriptor CONFLICT_RESOLUTION = new 
PropertyDescriptor.Builder()
+            .name("Conflict Resolution Strategy")
+            .description(
+                    "Indicates what should happen when a file with the same 
name already exists in the output directory")
+            .required(true).defaultValue(FAIL_RESOLUTION_AV.getValue())
+            .allowableValues(REPLACE_RESOLUTION_AV, IGNORE_RESOLUTION_AV, 
FAIL_RESOLUTION_AV).build();
+
+    public static final PropertyDescriptor FILE_FILTER_REGEX = new 
PropertyDescriptor.Builder()
+            .name("File Filter Regex")
+            .description(
+                    "A Java Regular Expression for filtering Filenames; if a 
filter is supplied then only files whose names match that Regular "
+                            + "Expression will be fetched, otherwise all files 
will be fetched")
+            
.required(false).addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
+
+    public static final PropertyDescriptor IGNORE_DOTTED_FILES = new 
PropertyDescriptor.Builder()
+            .name("Ignore Dotted Files")
+            .description("If true, files whose names begin with a dot (\".\") 
will be ignored").required(true)
+            .allowableValues("true", "false").defaultValue("true").build();
+
+    public static final PropertyDescriptor INPUT_DIRECTORY_OR_FILE = new 
PropertyDescriptor.Builder()
+            .name("Input Directory or File")
+            .description("The HDFS directory from which files should be read, 
or a single file to read.")
+            
.defaultValue("${path}").addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
+            .expressionLanguageSupported(true).build();
+
+    public static final PropertyDescriptor OUTPUT_DIRECTORY = new 
PropertyDescriptor.Builder().name("Output Directory")
+            .description("The HDFS directory where the files will be moved 
to").required(true)
+            
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR).expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor OPERATION = new 
PropertyDescriptor.Builder().name("HDFS Operation")
+            .description("The operation that will be performed on the source 
file").required(true)
+            .allowableValues("move", "copy").defaultValue("move").build();
+
+    public static final PropertyDescriptor REMOTE_OWNER = new 
PropertyDescriptor.Builder().name("Remote Owner")
+            .description(
+                    "Changes the owner of the HDFS file to this value after it 
is written. This only works if NiFi is running as a user that has HDFS super 
user privilege to change owner")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+    public static final PropertyDescriptor REMOTE_GROUP = new 
PropertyDescriptor.Builder().name("Remote Group")
+            .description(
+                    "Changes the group of the HDFS file to this value after it 
is written. This only works if NiFi is running as a user that has HDFS super 
user privilege to change group")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+    static {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_SUCCESS);
+        rels.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(rels);
+    }
+
+    // non-static global
+    protected ProcessorConfiguration processorConfig;
+    private final AtomicLong logEmptyListing = new AtomicLong(2L);
+
+    private final Lock listingLock = new ReentrantLock();
+    private final Lock queueLock = new ReentrantLock();
+
+    private final BlockingQueue<Path> filePathQueue = new 
LinkedBlockingQueue<>();
+    private final BlockingQueue<Path> processing = new LinkedBlockingQueue<>();
+
+    // methods
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        List<PropertyDescriptor> props = new ArrayList<>(properties);
+        props.add(CONFLICT_RESOLUTION);
+        props.add(INPUT_DIRECTORY_OR_FILE);
+        props.add(OUTPUT_DIRECTORY);
+        props.add(OPERATION);
+        props.add(FILE_FILTER_REGEX);
+        props.add(IGNORE_DOTTED_FILES);
+        props.add(REMOTE_OWNER);
+        props.add(REMOTE_GROUP);
+        return props;
+    }
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) throws Exception {
+        super.abstractOnScheduled(context);
+        // copy configuration values to pass them around cleanly
+        processorConfig = new ProcessorConfiguration(context);
+        // forget the state of the queue in case HDFS contents changed while
+        // this processor was turned off
+        queueLock.lock();
+        try {
+            filePathQueue.clear();
+            processing.clear();
+        } finally {
+            queueLock.unlock();
+        }
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        // MoveHDFS
+        FlowFile parentFlowFile = session.get();
         if (parentFlowFile == null) {
             return;
         }
@@ -209,319 +206,318 @@ public class MoveHDFS extends AbstractHadoopProcessor {
         Path inputPath = null;
         try {
             inputPath = new Path(filenameValue);
-            if(!hdfs.exists(inputPath)) {
-               throw new IOException("Input Directory or File does not exist 
in HDFS");
+            if (!hdfs.exists(inputPath)) {
+                throw new IOException("Input Directory or File does not exist 
in HDFS");
             }
         } catch (Exception e) {
-            getLogger().error("Failed to retrieve content from {} for {} due 
to {}; routing to failure", new Object[] {filenameValue, parentFlowFile, e});
+            getLogger().error("Failed to retrieve content from {} for {} due 
to {}; routing to failure", new Object[]{filenameValue, parentFlowFile, e});
             parentFlowFile = session.putAttribute(parentFlowFile, 
"hdfs.failure.reason", e.getMessage());
             parentFlowFile = session.penalize(parentFlowFile);
-               session.transfer(parentFlowFile, REL_FAILURE);
+            session.transfer(parentFlowFile, REL_FAILURE);
+            return;
+        }
+
+        List<Path> files = new ArrayList<Path>();
+
+        try {
+            final StopWatch stopWatch = new StopWatch(true);
+            Set<Path> listedFiles = performListing(context, inputPath);
+            stopWatch.stop();
+            final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+
+            if (listedFiles != null) {
+                // place files into the work queue
+                int newItems = 0;
+                queueLock.lock();
+                try {
+                    for (Path file : listedFiles) {
+                        if (!filePathQueue.contains(file) && 
!processing.contains(file)) {
+                            if (!filePathQueue.offer(file)) {
+                                break;
+                            }
+                            newItems++;
+                        }
+                    }
+                } catch (Exception e) {
+                    getLogger().warn("Could not add to processing queue due to 
{}", new Object[]{e.getMessage()}, e);
+                } finally {
+                    queueLock.unlock();
+                }
+                if (listedFiles.size() > 0) {
+                    logEmptyListing.set(3L);
+                }
+                if (logEmptyListing.getAndDecrement() > 0) {
+                    getLogger().info(
+                            "Obtained file listing in {} milliseconds; listing 
had {} items, {} of which were new",
+                            new Object[]{millis, listedFiles.size(), 
newItems});
+                }
+            }
+        } catch (IOException e) {
+            context.yield();
+            getLogger().warn("Error while retrieving list of files due to {}", 
new Object[]{e});
             return;
         }
+
+        // prepare to process a batch of files in the queue
+        queueLock.lock();
+        try {
+            filePathQueue.drainTo(files);
+            if (files.isEmpty()) {
+                // nothing to do!
+                session.remove(parentFlowFile);
+                context.yield();
+                return;
+            }
+        } finally {
+            queueLock.unlock();
+        }
+
+        processBatchOfFiles(files, context, session, parentFlowFile);
+
+        queueLock.lock();
+        try {
+            processing.removeAll(files);
+        } finally {
+            queueLock.unlock();
+        }
+
         session.remove(parentFlowFile);
+    }
+
+    protected void processBatchOfFiles(final List<Path> files, final 
ProcessContext context,
+                                       final ProcessSession session, FlowFile 
parentFlowFile) {
+        Preconditions.checkState(parentFlowFile != null, "No parent flowfile 
for this batch was provided");
+
+        // process the batch of files
+        final Configuration conf = getConfiguration();
+        final FileSystem hdfs = getFileSystem();
+        final UserGroupInformation ugi = getUserGroupInformation();
+
+        if (conf == null || ugi == null) {
+            getLogger().error("Configuration or UserGroupInformation not 
configured properly");
+            session.transfer(parentFlowFile, REL_FAILURE);
+            context.yield();
+            return;
+        }
+
+        for (final Path file : files) {
+
+            ugi.doAs(new PrivilegedAction<Object>() {
+                @Override
+                public Object run() {
+                    FlowFile flowFile = session.create(parentFlowFile);
+                    try {
+                        final String originalFilename = file.getName();
+                        final Path configuredRootOutputDirPath = 
processorConfig.getOutputDirectory();
+                        final Path newFile = new 
Path(configuredRootOutputDirPath, originalFilename);
+                        final boolean destinationExists = hdfs.exists(newFile);
+                        // If destination file already exists, resolve that
+                        // based on processor configuration
+                        if (destinationExists) {
+                            switch (processorConfig.getConflictResolution()) {
+                                case REPLACE_RESOLUTION:
+                                    if (hdfs.delete(file, false)) {
+                                        getLogger().info("deleted {} in order 
to replace with the contents of {}",
+                                                new Object[]{file, flowFile});
+                                    }
+                                    break;
+                                case IGNORE_RESOLUTION:
+                                    session.transfer(flowFile, REL_SUCCESS);
+                                    getLogger().info(
+                                            "transferring {} to success 
because file with same name already exists",
+                                            new Object[]{flowFile});
+                                    return null;
+                                case FAIL_RESOLUTION:
+                                    
session.transfer(session.penalize(flowFile), REL_FAILURE);
+                                    getLogger().warn(
+                                            "penalizing {} and routing to 
failure because file with same name already exists",
+                                            new Object[]{flowFile});
+                                    return null;
+                                default:
+                                    break;
+                            }
+                        }
+
+                        // Create destination directory if it does not exist
+                        try {
+                            if 
(!hdfs.getFileStatus(configuredRootOutputDirPath).isDirectory()) {
+                                throw new 
IOException(configuredRootOutputDirPath.toString()
+                                        + " already exists and is not a 
directory");
+                            }
+                        } catch (FileNotFoundException fe) {
+                            if (!hdfs.mkdirs(configuredRootOutputDirPath)) {
+                                throw new 
IOException(configuredRootOutputDirPath.toString() + " could not be created");
+                            }
+                            changeOwner(context, hdfs, 
configuredRootOutputDirPath);
+                        }
+
+                        boolean moved = false;
+                        for (int i = 0; i < 10; i++) { // try to rename 
multiple
+                            // times.
+                            if (processorConfig.getOperation().equals("move")) 
{
+                                if (hdfs.rename(file, newFile)) {
+                                    moved = true;
+                                    break;// rename was successful
+                                }
+                            } else {
+                                if (FileUtil.copy(hdfs, file, hdfs, newFile, 
false, conf)) {
+                                    moved = true;
+                                    break;// copy was successful
+                                }
+                            }
+                            Thread.sleep(200L);// try waiting to let whatever 
might cause rename failure to resolve
+                        }
+                        if (!moved) {
+                            throw new ProcessException("Could not move file " 
+ file + " to its final filename");
+                        }
+
+                        changeOwner(context, hdfs, newFile);
+                        final String outputPath = newFile.toString();
+                        final String newFilename = newFile.getName();
+                        final String hdfsPath = newFile.getParent().toString();
+                        flowFile = session.putAttribute(flowFile, 
CoreAttributes.FILENAME.key(), newFilename);
+                        flowFile = session.putAttribute(flowFile, 
ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
+                        final String transitUri = (outputPath.startsWith("/")) 
? "hdfs:/" + outputPath
+                                : "hdfs://" + outputPath;
+                        session.getProvenanceReporter().send(flowFile, 
transitUri);
+                        session.transfer(flowFile, REL_SUCCESS);
+
+                    } catch (final Throwable t) {
+                        getLogger().error("Failed to rename on HDFS due to 
{}", new Object[]{t});
+                        session.transfer(session.penalize(flowFile), 
REL_FAILURE);
+                        context.yield();
+                    }
+                    return null;
+                }
+            });
+        }
+    }
+
+    protected Set<Path> performListing(final ProcessContext context, Path 
path) throws IOException {
+        Set<Path> listing = null;
+
+        if (listingLock.tryLock()) {
+            try {
+                final FileSystem hdfs = getFileSystem();
+                // get listing
+                listing = selectFiles(hdfs, path, null);
+            } finally {
+                listingLock.unlock();
+            }
+        }
 
-               List<Path> files = new ArrayList<Path>();
-
-               try {
-                       final StopWatch stopWatch = new StopWatch(true);
-                       Set<Path> listedFiles = performListing(context, 
inputPath);
-                       stopWatch.stop();
-                       final long millis = 
stopWatch.getDuration(TimeUnit.MILLISECONDS);
-
-                       if (listedFiles != null) {
-                               // place files into the work queue
-                               int newItems = 0;
-                               queueLock.lock();
-                               try {
-                                       for (Path file : listedFiles) {
-                                               if 
(!filePathQueue.contains(file) && !processing.contains(file)) {
-                                                       if 
(!filePathQueue.offer(file)) {
-                                                               break;
-                                                       }
-                                                       newItems++;
-                                               }
-                                       }
-                               } catch (Exception e) {
-                                       getLogger().warn("Could not add to 
processing queue due to {}", new Object[] { e });
-                               } finally {
-                                       queueLock.unlock();
-                               }
-                               if (listedFiles.size() > 0) {
-                                       logEmptyListing.set(3L);
-                               }
-                               if (logEmptyListing.getAndDecrement() > 0) {
-                                       getLogger().info(
-                                                       "Obtained file listing 
in {} milliseconds; listing had {} items, {} of which were new",
-                                                       new Object[] { millis, 
listedFiles.size(), newItems });
-                               }
-                       }
-               } catch (IOException e) {
-                       context.yield();
-                       getLogger().warn("Error while retrieving list of files 
due to {}", new Object[] { e });
-                       return;
-               }
-
-               // prepare to process a batch of files in the queue
-               queueLock.lock();
-               try {
-                       filePathQueue.drainTo(files);
-                       if (files.isEmpty()) {
-                               // nothing to do!
-                               context.yield();
-                               return;
-                       }
-               } finally {
-                       queueLock.unlock();
-               }
-
-               processBatchOfFiles(files, context, session);
-
-               queueLock.lock();
-               try {
-                       processing.removeAll(files);
-               } finally {
-                       queueLock.unlock();
-               }
-       }
-
-       protected void processBatchOfFiles(final List<Path> files, final 
ProcessContext context,
-                       final ProcessSession session) {
-               // process the batch of files
-               final Configuration conf = getConfiguration();
-               final FileSystem hdfs = getFileSystem();
-               final UserGroupInformation ugi = getUserGroupInformation();
-
-               if (conf == null || ugi == null) {
-                       getLogger().error("Configuration or 
UserGroupInformation not configured properly");
-                       session.transfer(session.get(), REL_FAILURE);
-                       context.yield();
-                       return;
-               }
-
-               for (final Path file : files) {
-
-                       ugi.doAs(new PrivilegedAction<Object>() {
-                               @Override
-                               public Object run() {
-                                       FlowFile flowFile = session.create();
-                                       try {
-                                               final String originalFilename = 
file.getName();
-                                               final Path 
configuredRootOutputDirPath = processorConfig.getOutputDirectory();
-                                               final Path newFile = new 
Path(configuredRootOutputDirPath, originalFilename);
-                                               final boolean destinationExists 
= hdfs.exists(newFile);
-                                               // If destination file already 
exists, resolve that
-                                               // based on processor 
configuration
-                                               if (destinationExists) {
-                                                       switch 
(processorConfig.getConflictResolution()) {
-                                                       case REPLACE_RESOLUTION:
-                                                               if 
(hdfs.delete(file, false)) {
-                                                                       
getLogger().info("deleted {} in order to replace with the contents of {}",
-                                                                               
        new Object[] { file, flowFile });
-                                                               }
-                                                               break;
-                                                       case IGNORE_RESOLUTION:
-                                                               
session.transfer(flowFile, REL_SUCCESS);
-                                                               
getLogger().info(
-                                                                               
"transferring {} to success because file with same name already exists",
-                                                                               
new Object[] { flowFile });
-                                                               return null;
-                                                       case FAIL_RESOLUTION:
-                                                               
session.transfer(session.penalize(flowFile), REL_FAILURE);
-                                                               
getLogger().warn(
-                                                                               
"penalizing {} and routing to failure because file with same name already 
exists",
-                                                                               
new Object[] { flowFile });
-                                                               return null;
-                                                       default:
-                                                               break;
-                                                       }
-                                               }
-
-                                               // Create destination directory 
if it does not exist
-                                               try {
-                                                       if 
(!hdfs.getFileStatus(configuredRootOutputDirPath).isDirectory()) {
-                                                               throw new 
IOException(configuredRootOutputDirPath.toString()
-                                                                               
+ " already exists and is not a directory");
-                                                       }
-                                               } catch (FileNotFoundException 
fe) {
-                                                       if 
(!hdfs.mkdirs(configuredRootOutputDirPath)) {
-                                                               throw new 
IOException(configuredRootOutputDirPath.toString() + " could not be created");
-                                                       }
-                                                       changeOwner(context, 
hdfs, configuredRootOutputDirPath);
-                                               }
-
-                                               boolean moved = false;
-                                               for (int i = 0; i < 10; i++) { 
// try to rename multiple
-                                                                               
                                // times.
-                                                       if 
(processorConfig.getOperation().equals("move")) {
-                                                               if 
(hdfs.rename(file, newFile)) {
-                                                                       moved = 
true;
-                                                                       
break;// rename was successful
-                                                               }
-                                                       } else {
-                                                               if 
(FileUtil.copy(hdfs, file, hdfs, newFile, false, conf)) {
-                                                                       moved = 
true;
-                                                                       
break;// copy was successful
-                                                               }
-                                                       }
-                                                       Thread.sleep(200L);// 
try waiting to let whatever
-                                                                               
                // might cause rename failure to
-                                                                               
                // resolve
-                                               }
-                                               if (!moved) {
-                                                       throw new 
ProcessException("Could not move file " + file + " to its final filename");
-                                               }
-
-                                               changeOwner(context, hdfs, 
file);
-                                               final String outputPath = 
newFile.toString();
-                                               final String newFilename = 
newFile.getName();
-                                               final String hdfsPath = 
newFile.getParent().toString();
-                                               flowFile = 
session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), newFilename);
-                                               flowFile = 
session.putAttribute(flowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
-                                               final String transitUri = 
(outputPath.startsWith("/")) ? "hdfs:/" + outputPath
-                                                               : "hdfs://" + 
outputPath;
-                                               
session.getProvenanceReporter().send(flowFile, transitUri);
-                                               session.transfer(flowFile, 
REL_SUCCESS);
-
-                                       } catch (final Throwable t) {
-                                               getLogger().error("Failed to 
rename on HDFS due to {}", new Object[] { t });
-                                               
session.transfer(session.penalize(flowFile), REL_FAILURE);
-                                               context.yield();
-                                       }
-                                       return null;
-                               }
-                       });
-               }
-       }
-
-       protected Set<Path> performListing(final ProcessContext context, Path 
path) throws IOException {
-               Set<Path> listing = null;
-
-               if (listingLock.tryLock()) {
-                       try {
-                               final FileSystem hdfs = getFileSystem();
-                               // get listing
-                               listing = selectFiles(hdfs, path, null);
-                       } finally {
-                               listingLock.unlock();
-                       }
-               }
-
-               return listing;
-       }
-
-       protected void changeOwner(final ProcessContext context, final 
FileSystem hdfs, final Path name) {
-               try {
-                       // Change owner and group of file if configured to do so
-                       String owner = 
context.getProperty(REMOTE_OWNER).getValue();
-                       String group = 
context.getProperty(REMOTE_GROUP).getValue();
-                       if (owner != null || group != null) {
-                               hdfs.setOwner(name, owner, group);
-                       }
-               } catch (Exception e) {
-                       getLogger().warn("Could not change owner or group of {} 
on HDFS due to {}", new Object[] { name, e });
-               }
-       }
-
-       protected Set<Path> selectFiles(final FileSystem hdfs, final Path 
inputPath, Set<Path> filesVisited)
-                       throws IOException {
-               if (null == filesVisited) {
-                       filesVisited = new HashSet<>();
-               }
-
-               if (!hdfs.exists(inputPath)) {
-                       throw new IOException("Selection directory " + 
inputPath.toString() + " doesn't appear to exist!");
-               }
-
-               final Set<Path> files = new HashSet<>();
-
-               FileStatus inputStatus = hdfs.getFileStatus(inputPath);
-
-               if (inputStatus.isDirectory()) {
-                       for (final FileStatus file : 
hdfs.listStatus(inputPath)) {
-                               final Path canonicalFile = file.getPath();
-
-                               if (!filesVisited.add(canonicalFile)) { // skip 
files we've
-                                                                               
                                // already
-                                       // seen (may be looping
-                                       // directory links)
-                                       continue;
-                               }
-
-                               if (!file.isDirectory() && 
processorConfig.getPathFilter(inputPath).accept(canonicalFile)) {
-                                       files.add(canonicalFile);
-
-                                       if (getLogger().isDebugEnabled()) {
-                                               getLogger().debug(this + " 
selected file at path: " + canonicalFile.toString());
-                                       }
-                               }
-                       }
-               } else if (inputStatus.isFile()) {
-                       files.add(inputPath);
-               }
-               return files;
-       }
-
-       protected static class ProcessorConfiguration {
-
-               final private String conflictResolution;
-               final private String operation;
-               final private Path inputRootDirPath;
-               final private Path outputRootDirPath;
-               final private Pattern fileFilterPattern;
-               final private boolean ignoreDottedFiles;
-
-               ProcessorConfiguration(final ProcessContext context) {
-                       conflictResolution = 
context.getProperty(CONFLICT_RESOLUTION).getValue();
-                       operation = context.getProperty(OPERATION).getValue();
-                       final String inputDirValue = 
context.getProperty(INPUT_DIRECTORY_OR_FILE).getValue();
-                       inputRootDirPath = new Path(inputDirValue);
-                       final String outputDirValue = 
context.getProperty(OUTPUT_DIRECTORY).getValue();
-                       outputRootDirPath = new Path(outputDirValue);
-                       final String fileFilterRegex = 
context.getProperty(FILE_FILTER_REGEX).getValue();
-                       fileFilterPattern = (fileFilterRegex == null) ? null : 
Pattern.compile(fileFilterRegex);
-                       ignoreDottedFiles = 
context.getProperty(IGNORE_DOTTED_FILES).asBoolean();
-               }
-
-               public String getOperation() {
-                       return operation;
-               }
-
-               public String getConflictResolution() {
-                       return conflictResolution;
-               }
-
-               public Path getInput() {
-                       return inputRootDirPath;
-               }
-
-               public Path getOutputDirectory() {
-                       return outputRootDirPath;
-               }
-
-               protected PathFilter getPathFilter(final Path dir) {
-                       return new PathFilter() {
-
-                               @Override
-                               public boolean accept(Path path) {
-                                       if (ignoreDottedFiles && 
path.getName().startsWith(".")) {
-                                               return false;
-                                       }
-                                       final String pathToCompare;
-                                       String relativePath = 
getPathDifference(dir, path);
-                                       if (relativePath.length() == 0) {
-                                               pathToCompare = path.getName();
-                                       } else {
-                                               pathToCompare = relativePath + 
Path.SEPARATOR + path.getName();
-                                       }
-
-                                       if (fileFilterPattern != null && 
!fileFilterPattern.matcher(pathToCompare).matches()) {
-                                               return false;
-                                       }
-                                       return true;
-                               }
-
-                       };
-               }
-       }
+        return listing;
+    }
+
+    protected void changeOwner(final ProcessContext context, final FileSystem 
hdfs, final Path name) {
+        try {
+            // Change owner and group of file if configured to do so
+            String owner = context.getProperty(REMOTE_OWNER).getValue();
+            String group = context.getProperty(REMOTE_GROUP).getValue();
+            if (owner != null || group != null) {
+                hdfs.setOwner(name, owner, group);
+            }
+        } catch (Exception e) {
+            getLogger().warn("Could not change owner or group of {} on HDFS 
due to {}", new Object[]{name, e.getMessage()}, e);
+        }
+    }
+
+    protected Set<Path> selectFiles(final FileSystem hdfs, final Path 
inputPath, Set<Path> filesVisited)
+            throws IOException {
+        if (null == filesVisited) {
+            filesVisited = new HashSet<>();
+        }
+
+        if (!hdfs.exists(inputPath)) {
+            throw new IOException("Selection directory " + 
inputPath.toString() + " doesn't appear to exist!");
+        }
+
+        final Set<Path> files = new HashSet<>();
+
+        FileStatus inputStatus = hdfs.getFileStatus(inputPath);
+
+        if (inputStatus.isDirectory()) {
+            for (final FileStatus file : hdfs.listStatus(inputPath)) {
+                final Path canonicalFile = file.getPath();
+
+                if (!filesVisited.add(canonicalFile)) { // skip files we've 
already seen (may be looping directory links)
+                    continue;
+                }
+
+                if (!file.isDirectory() && 
processorConfig.getPathFilter(inputPath).accept(canonicalFile)) {
+                    files.add(canonicalFile);
+
+                    if (getLogger().isDebugEnabled()) {
+                        getLogger().debug(this + " selected file at path: " + 
canonicalFile.toString());
+                    }
+                }
+            }
+        } else if (inputStatus.isFile()) {
+            files.add(inputPath);
+        }
+        return files;
+    }
+
+    protected static class ProcessorConfiguration {
+
+        final private String conflictResolution;
+        final private String operation;
+        final private Path inputRootDirPath;
+        final private Path outputRootDirPath;
+        final private Pattern fileFilterPattern;
+        final private boolean ignoreDottedFiles;
+
+        ProcessorConfiguration(final ProcessContext context) {
+            conflictResolution = 
context.getProperty(CONFLICT_RESOLUTION).getValue();
+            operation = context.getProperty(OPERATION).getValue();
+            final String inputDirValue = 
context.getProperty(INPUT_DIRECTORY_OR_FILE).evaluateAttributeExpressions().getValue();
+            inputRootDirPath = new Path(inputDirValue);
+            final String outputDirValue = 
context.getProperty(OUTPUT_DIRECTORY).evaluateAttributeExpressions().getValue();
+            outputRootDirPath = new Path(outputDirValue);
+            final String fileFilterRegex = 
context.getProperty(FILE_FILTER_REGEX).getValue();
+            fileFilterPattern = (fileFilterRegex == null) ? null : 
Pattern.compile(fileFilterRegex);
+            ignoreDottedFiles = 
context.getProperty(IGNORE_DOTTED_FILES).asBoolean();
+        }
+
+        public String getOperation() {
+            return operation;
+        }
+
+        public String getConflictResolution() {
+            return conflictResolution;
+        }
+
+        public Path getInput() {
+            return inputRootDirPath;
+        }
+
+        public Path getOutputDirectory() {
+            return outputRootDirPath;
+        }
+
+        protected PathFilter getPathFilter(final Path dir) {
+            return new PathFilter() {
+
+                @Override
+                public boolean accept(Path path) {
+                    if (ignoreDottedFiles && path.getName().startsWith(".")) {
+                        return false;
+                    }
+                    final String pathToCompare;
+                    String relativePath = getPathDifference(dir, path);
+                    if (relativePath.length() == 0) {
+                        pathToCompare = path.getName();
+                    } else {
+                        pathToCompare = relativePath + Path.SEPARATOR + 
path.getName();
+                    }
+
+                    if (fileFilterPattern != null && 
!fileFilterPattern.matcher(pathToCompare).matches()) {
+                        return false;
+                    }
+                    return true;
+                }
+
+            };
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/600586d6/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java
index 6aecdd3..c55a2b1 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java
@@ -1,15 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.nifi.processors.hadoop;
 
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-
+import org.apache.commons.io.FileUtils;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.hadoop.KerberosProperties;
 import org.apache.nifi.processor.ProcessContext;
@@ -23,173 +30,222 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 public class MoveHDFSTest {
 
-       private static final String OUTPUT_DIRECTORY = 
"src/test/resources/testdataoutput";
-       private static final String INPUT_DIRECTORY = 
"src/test/resources/testdata";
-       private static final String DOT_FILE_PATH = 
"src/test/resources/testdata/.testfordotfiles";
-       private NiFiProperties mockNiFiProperties;
-       private KerberosProperties kerberosProperties;
-
-       @Before
-       public void setup() {
-               mockNiFiProperties = mock(NiFiProperties.class);
-               
when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null);
-               kerberosProperties = new KerberosProperties(null);
-       }
-
-       @After
-       public void teardown() {
-               File outputDirectory = new File(OUTPUT_DIRECTORY);
-               if (outputDirectory.exists()) {
-                       if (outputDirectory.isDirectory()) {
-                               moveFilesFromOutputDirectoryToInput();
-                       }
-                       outputDirectory.delete();
-               }
-               removeDotFile();
-       }
-
-       private void removeDotFile() {
-               File dotFile = new File(DOT_FILE_PATH);
-               if (dotFile.exists()) {
-                       dotFile.delete();
-               }
-       }
-
-       private void moveFilesFromOutputDirectoryToInput() {
-               File folder = new File(OUTPUT_DIRECTORY);
-               for (File file : folder.listFiles()) {
-                       if (file.isFile()) {
-                               String path = file.getAbsolutePath();
-                               if(!path.endsWith(".crc")) {
-                                       String newPath = 
path.replaceAll("testdataoutput", "testdata");
-                                       File newFile = new File(newPath);
-                                       if (!newFile.exists()) {
-                                               file.renameTo(newFile);
-                                       }
-                               } else {
-                                       file.delete();
-                               }
-                       }
-               }
-       }
-
-       @Test
-       public void testOutputDirectoryValidator() {
-               MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
-               TestRunner runner = TestRunners.newTestRunner(proc);
-               Collection<ValidationResult> results;
-               ProcessContext pc;
-
-               results = new HashSet<>();
-               runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, "/source");
-               runner.enqueue(new byte[0]);
-               pc = runner.getProcessContext();
-               if (pc instanceof MockProcessContext) {
-                       results = ((MockProcessContext) pc).validate();
-               }
-               Assert.assertEquals(1, results.size());
-               for (ValidationResult vr : results) {
-                       assertTrue(vr.toString().contains("Output Directory is 
required"));
-               }
-       }
-
-       @Test
-       public void testBothInputAndOutputDirectoriesAreValid() {
-               MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
-               TestRunner runner = TestRunners.newTestRunner(proc);
-               Collection<ValidationResult> results;
-               ProcessContext pc;
-
-               results = new HashSet<>();
-               runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, 
INPUT_DIRECTORY);
-               runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
-               runner.enqueue(new byte[0]);
-               pc = runner.getProcessContext();
-               if (pc instanceof MockProcessContext) {
-                       results = ((MockProcessContext) pc).validate();
-               }
-               Assert.assertEquals(0, results.size());
-       }
-
-       @Test
-       public void testOnScheduledShouldRunCleanly() {
-               MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
-               TestRunner runner = TestRunners.newTestRunner(proc);
-               runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, 
INPUT_DIRECTORY);
-               runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
-               runner.enqueue(new byte[0]);
-               runner.setValidateExpressionUsage(false);
-               runner.run();
-               List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
-               runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
-               Assert.assertEquals(7, flowFiles.size());
-       }
-       
-       @Test
-       public void testDotFileFilter() throws IOException {
-               createDotFile();
-               MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
-               TestRunner runner = TestRunners.newTestRunner(proc);
-               runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, 
INPUT_DIRECTORY);
-               runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
-               runner.setProperty(MoveHDFS.IGNORE_DOTTED_FILES, "false");
-               runner.enqueue(new byte[0]);
-               runner.setValidateExpressionUsage(false);
-               runner.run();
-               List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
-               runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
-               Assert.assertEquals(8, flowFiles.size());
-       }
-       
-       @Test
-       public void testFileFilterRegex() {
-               MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
-               TestRunner runner = TestRunners.newTestRunner(proc);
-               runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, 
INPUT_DIRECTORY);
-               runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
-               runner.setProperty(MoveHDFS.FILE_FILTER_REGEX, ".*\\.gz");
-               runner.enqueue(new byte[0]);
-               runner.setValidateExpressionUsage(false);
-               runner.run();
-               List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
-               runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
-               Assert.assertEquals(1, flowFiles.size());
-       }
-       
-       @Test
-       public void testSingleFileAsInput() {
-               MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
-               TestRunner runner = TestRunners.newTestRunner(proc);
-               runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, 
INPUT_DIRECTORY + "/randombytes-1");
-               runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
-               runner.enqueue(new byte[0]);
-               runner.setValidateExpressionUsage(false);
-               runner.run();
-               List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
-               runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
-               Assert.assertEquals(1, flowFiles.size());
-       }
-
-       private void createDotFile() throws IOException {
-               File dotFile = new File(DOT_FILE_PATH);
-               dotFile.createNewFile();
-       }
-
-       private static class TestableMoveHDFS extends MoveHDFS {
-
-               private KerberosProperties testKerberosProperties;
-
-               public TestableMoveHDFS(KerberosProperties 
testKerberosProperties) {
-                       this.testKerberosProperties = testKerberosProperties;
-               }
-
-               @Override
-               protected KerberosProperties getKerberosProperties(File 
kerberosConfigFile) {
-                       return testKerberosProperties;
-               }
-
-       }
+    private static final String OUTPUT_DIRECTORY = "target/test-data-output";
+    private static final String TEST_DATA_DIRECTORY = 
"src/test/resources/testdata";
+    private static final String INPUT_DIRECTORY = "target/test-data-input";
+    private NiFiProperties mockNiFiProperties;
+    private KerberosProperties kerberosProperties;
+
+    @Before
+    public void setup() {
+        mockNiFiProperties = mock(NiFiProperties.class);
+        
when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null);
+        kerberosProperties = new KerberosProperties(null);
+    }
+
+    @After
+    public void teardown() {
+        File inputDirectory = new File(INPUT_DIRECTORY);
+        File outputDirectory = new File(OUTPUT_DIRECTORY);
+        if (inputDirectory.exists()) {
+            Assert.assertTrue("Could not delete input directory: " + 
inputDirectory, FileUtils.deleteQuietly(inputDirectory));
+        }
+        if (outputDirectory.exists()) {
+            Assert.assertTrue("Could not delete output directory: " + 
outputDirectory, FileUtils.deleteQuietly(outputDirectory));
+        }
+    }
+
+    @Test
+    public void testOutputDirectoryValidator() {
+        MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        Collection<ValidationResult> results;
+        ProcessContext pc;
+
+        results = new HashSet<>();
+        runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, "/source");
+        runner.enqueue(new byte[0]);
+        pc = runner.getProcessContext();
+        if (pc instanceof MockProcessContext) {
+            results = ((MockProcessContext) pc).validate();
+        }
+        Assert.assertEquals(1, results.size());
+        for (ValidationResult vr : results) {
+            assertTrue(vr.toString().contains("Output Directory is required"));
+        }
+    }
+
+    @Test
+    public void testBothInputAndOutputDirectoriesAreValid() {
+        MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        Collection<ValidationResult> results;
+        ProcessContext pc;
+
+        results = new HashSet<>();
+        runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
+        runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
+        runner.enqueue(new byte[0]);
+        pc = runner.getProcessContext();
+        if (pc instanceof MockProcessContext) {
+            results = ((MockProcessContext) pc).validate();
+        }
+        Assert.assertEquals(0, results.size());
+    }
+
+    @Test
+    public void testOnScheduledShouldRunCleanly() throws IOException {
+        FileUtils.copyDirectory(new File(TEST_DATA_DIRECTORY), new 
File(INPUT_DIRECTORY));
+        MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
+        runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
+        runner.enqueue(new byte[0]);
+        runner.run();
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
+        runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
+        Assert.assertEquals(7, flowFiles.size());
+    }
+
+    @Test
+    public void testDotFileFilterIgnore() throws IOException {
+        FileUtils.copyDirectory(new File(TEST_DATA_DIRECTORY), new 
File(INPUT_DIRECTORY));
+        MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
+        runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
+        runner.setProperty(MoveHDFS.IGNORE_DOTTED_FILES, "true");
+        runner.enqueue(new byte[0]);
+        runner.run();
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
+        runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
+        Assert.assertEquals(7, flowFiles.size());
+        Assert.assertTrue(new File(INPUT_DIRECTORY, ".dotfile").exists());
+    }
+
+    @Test
+    public void testDotFileFilterInclude() throws IOException {
+        FileUtils.copyDirectory(new File(TEST_DATA_DIRECTORY), new 
File(INPUT_DIRECTORY));
+        MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
+        runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
+        runner.setProperty(MoveHDFS.IGNORE_DOTTED_FILES, "false");
+        runner.enqueue(new byte[0]);
+        runner.run();
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
+        runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
+        Assert.assertEquals(8, flowFiles.size());
+    }
+
+    @Test
+    public void testFileFilterRegex() throws IOException {
+        FileUtils.copyDirectory(new File(TEST_DATA_DIRECTORY), new 
File(INPUT_DIRECTORY));
+        MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
+        runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
+        runner.setProperty(MoveHDFS.FILE_FILTER_REGEX, ".*\\.gz");
+        runner.enqueue(new byte[0]);
+        runner.run();
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
+        runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
+        Assert.assertEquals(1, flowFiles.size());
+    }
+
+    @Test
+    public void testSingleFileAsInputCopy() throws IOException {
+        FileUtils.copyDirectory(new File(TEST_DATA_DIRECTORY), new 
File(INPUT_DIRECTORY));
+        MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY + 
"/randombytes-1");
+        runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
+        runner.setProperty(MoveHDFS.OPERATION, "copy");
+        runner.enqueue(new byte[0]);
+        runner.run();
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
+        runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
+        Assert.assertEquals(1, flowFiles.size());
+        Assert.assertTrue(new File(INPUT_DIRECTORY, "randombytes-1").exists());
+        Assert.assertTrue(new File(OUTPUT_DIRECTORY, 
"randombytes-1").exists());
+    }
+
+    @Test
+    public void testSingleFileAsInputMove() throws IOException {
+        FileUtils.copyDirectory(new File(TEST_DATA_DIRECTORY), new 
File(INPUT_DIRECTORY));
+        MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY + 
"/randombytes-1");
+        runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
+        runner.enqueue(new byte[0]);
+        runner.run();
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
+        runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
+        Assert.assertEquals(1, flowFiles.size());
+        Assert.assertFalse(new File(INPUT_DIRECTORY, 
"randombytes-1").exists());
+        Assert.assertTrue(new File(OUTPUT_DIRECTORY, 
"randombytes-1").exists());
+    }
+
+    @Test
+    public void testDirectoryWithSubDirectoryAsInputMove() throws IOException {
+        FileUtils.copyDirectory(new File(TEST_DATA_DIRECTORY), new 
File(INPUT_DIRECTORY));
+        File subdir = new File(INPUT_DIRECTORY, "subdir");
+        FileUtils.copyDirectory(new File(TEST_DATA_DIRECTORY), subdir);
+        MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
+        runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
+        runner.enqueue(new byte[0]);
+        runner.run();
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
+        runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
+        Assert.assertEquals(7, flowFiles.size());
+        Assert.assertTrue(new File(INPUT_DIRECTORY).exists());
+        Assert.assertTrue(subdir.exists());
+    }
+
+    @Test
+    public void testEmptyInputDirectory() throws IOException {
+        MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        Files.createDirectories(Paths.get(INPUT_DIRECTORY));
+        runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
+        runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
+        runner.enqueue(new byte[0]);
+        Assert.assertEquals(0, Files.list(Paths.get(INPUT_DIRECTORY)).count());
+        runner.run();
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
+        runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
+        Assert.assertEquals(0, flowFiles.size());
+    }
+
+    private static class TestableMoveHDFS extends MoveHDFS {
+
+        private KerberosProperties testKerberosProperties;
+
+        public TestableMoveHDFS(KerberosProperties testKerberosProperties) {
+            this.testKerberosProperties = testKerberosProperties;
+        }
+
+        @Override
+        protected KerberosProperties getKerberosProperties(File 
kerberosConfigFile) {
+            return testKerberosProperties;
+        }
+
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/600586d6/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/testdata/.dotfile
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/testdata/.dotfile
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/testdata/.dotfile
new file mode 100644
index 0000000..e69de29

Reply via email to