NIFI-631: Initial implementation of FetchFile processor

Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/58384c78
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/58384c78
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/58384c78

Branch: refs/heads/NIFI-631
Commit: 58384c78dc3ba170490c4aa8461fce9390f5a693
Parents: 7d86313
Author: Mark Payne <[email protected]>
Authored: Mon Oct 5 14:07:15 2015 -0400
Committer: Tony Kurc <[email protected]>
Committed: Tue Nov 24 22:04:47 2015 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/FetchFile.java     | 308 +++++++++++++++++++
 .../nifi/processors/standard/GetFile.java       |   2 +-
 .../nifi/processors/standard/PutFile.java       |   2 +-
 .../org.apache.nifi.processor.Processor         |   1 +
 .../nifi/processors/standard/TestFetchFile.java | 258 ++++++++++++++++
 5 files changed, 569 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/58384c78/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFile.java
new file mode 100644
index 0000000..a350f6f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFile.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.file.CopyOption;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
+
+@Tags({"local", "files", "filesystem", "ingest", "ingress", "get", "source", 
"input"})
+@CapabilityDescription("Reads the contents of a file from disk and streams it 
into the contents of an incoming FlowFile. Once this is done, the file is 
optionally moved elsewhere or deleted "
+    + "to help keep the file system organized.")
+@SeeAlso({GetFile.class, PutFile.class})
+public class FetchFile extends AbstractProcessor {
+    static final AllowableValue COMPLETION_NONE = new AllowableValue("None", 
"None", "Leave the file as-is");
+    static final AllowableValue COMPLETION_MOVE = new AllowableValue("Move 
File", "Move File", "Moves the file to the directory specified by the <Move 
Destination Directory> property");
+    static final AllowableValue COMPLETION_DELETE = new AllowableValue("Delete 
File", "Delete File", "Deletes the original file from the file system");
+
+    static final AllowableValue CONFLICT_REPLACE = new AllowableValue("Replace 
File", "Replace File", "The newly ingested file should replace the existing 
file in the Destination Directory");
+    static final AllowableValue CONFLICT_KEEP_INTACT = new 
AllowableValue("Keep Existing", "Keep Existing", "The existing file should in 
the Destination Directory should stay intact and the newly "
+        + "ingested file should be deleted");
+    static final AllowableValue CONFLICT_FAIL = new AllowableValue("Fail", 
"Fail", "The existing destination file should remain intact and the incoming 
FlowFile should be routed to failure");
+    static final AllowableValue CONFLICT_RENAME = new AllowableValue("Rename", 
"Rename", "The existing destination file should remain intact. The newly 
ingested file should be moved to the "
+        + "destination directory but be renamed to a random filename");
+
+    static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder()
+        .name("File to Fetch")
+        .description("The fully-qualified filename of the file to fetch from 
the file system")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .defaultValue("${path}/${filename}")
+        .required(true)
+        .build();
+    static final PropertyDescriptor COMPLETION_STRATEGY = new 
PropertyDescriptor.Builder()
+        .name("Completion Strategy")
+        .description("Specifies what to do with the original file on the file 
system once it has been pulled into NiFi")
+        .expressionLanguageSupported(false)
+        .allowableValues(COMPLETION_NONE, COMPLETION_MOVE, COMPLETION_DELETE)
+        .defaultValue(COMPLETION_NONE.getValue())
+        .required(true)
+        .build();
+    static final PropertyDescriptor MOVE_DESTINATION_DIR = new 
PropertyDescriptor.Builder()
+        .name("Move Destination Directory")
+        .description("The directory to the move the original file to once it 
has been fetched from the file system. This property is ignored unless the 
Completion Strategy is set to \"Move File\". "
+            + "If the directory does not exist, it will be created.")
+        .expressionLanguageSupported(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .required(false)
+        .build();
+    static final PropertyDescriptor CONFLICT_STRATEGY = new 
PropertyDescriptor.Builder()
+        .name("Move Conflict Strategy")
+        .description("If Completion Strategy is set to Move File and a file 
already exists in the destination directory with the same name, this property 
specifies "
+            + "how that naming conflict should be resolved")
+        .allowableValues(CONFLICT_RENAME, CONFLICT_REPLACE, 
CONFLICT_KEEP_INTACT, CONFLICT_FAIL)
+        .defaultValue(CONFLICT_RENAME.getValue())
+        .required(true)
+        .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("Any FlowFile that is successfully fetched from the file 
system will be transferred to this Relationship.")
+        .build();
+    static final Relationship REL_NOT_FOUND = new Relationship.Builder()
+        .name("not.found")
+        .description("Any FlowFile that could not be fetched from the file 
system because the file could not be found will be transferred to this 
Relationship.")
+        .build();
+    static final Relationship REL_PERMISSION_DENIED = new 
Relationship.Builder()
+        .name("permission.denied")
+        .description("Any FlowFile that could not be fetched from the file 
system due to the user running NiFi not having sufficient permissions will be 
transferred to this Relationship.")
+        .build();
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+        .name("failure")
+        .description(
+            "Any FlowFile that could not be fetched from the file system for 
any reason other than insufficient permissions or the file not existing will be 
transferred to this Relationship.")
+        .build();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(FILENAME);
+        properties.add(COMPLETION_STRATEGY);
+        properties.add(MOVE_DESTINATION_DIR);
+        properties.add(CONFLICT_STRATEGY);
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_NOT_FOUND);
+        relationships.add(REL_PERMISSION_DENIED);
+        relationships.add(REL_FAILURE);
+        return relationships;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>();
+
+        if 
(COMPLETION_MOVE.getValue().equalsIgnoreCase(validationContext.getProperty(COMPLETION_STRATEGY).getValue()))
 {
+            if (!validationContext.getProperty(MOVE_DESTINATION_DIR).isSet()) {
+                results.add(new 
ValidationResult.Builder().subject(MOVE_DESTINATION_DIR.getName()).input(null).valid(false).explanation(
+                    MOVE_DESTINATION_DIR.getName() + " must be specified if " 
+ COMPLETION_STRATEGY.getName() + " is set to " + 
COMPLETION_MOVE.getDisplayName()).build());
+            }
+        }
+
+        return results;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final StopWatch stopWatch = new StopWatch(true);
+        final String filename = 
context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue();
+        final File file = new File(filename);
+
+        // Verify that file exists
+        if (!file.exists()) {
+            getLogger().error("Could not fetch file {} from file system for {} 
because the file does not exist; routing to not.found", new Object[] {file, 
flowFile});
+            session.getProvenanceReporter().route(flowFile, REL_NOT_FOUND);
+            session.transfer(session.penalize(flowFile), REL_NOT_FOUND);
+            return;
+        }
+
+        // Verify read permission on file
+        final String user = System.getProperty("user.name");
+        if (!isReadable(file)) {
+            getLogger().error("Could not fetch file {} from file system for {} 
due to user {} not having sufficient permissions to read the file; routing to 
permission.denied",
+                new Object[] {file, flowFile, user});
+            session.getProvenanceReporter().route(flowFile, 
REL_PERMISSION_DENIED);
+            session.transfer(session.penalize(flowFile), 
REL_PERMISSION_DENIED);
+            return;
+        }
+
+        // If configured to move the file and fail if unable to do so, check 
that the existing file does not exist and that we have write permissions
+        // for the parent file.
+        final String completionStrategy = 
context.getProperty(COMPLETION_STRATEGY).getValue();
+        final String targetDirectoryName = 
context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue();
+        if (targetDirectoryName != null) {
+            final File targetDir = new File(targetDirectoryName);
+            if 
(COMPLETION_MOVE.getValue().equalsIgnoreCase(completionStrategy)) {
+                if (targetDir.exists() && (!isWritable(targetDir) || 
!isDirectory(targetDir))) {
+                    getLogger().error("Could not fetch file {} from file 
system for {} because Completion Strategy is configured to move the original 
file to {}, "
+                        + "but that is not a directory or user {} does not 
have permissions to write to that directory",
+                        new Object[] {file, flowFile, targetDir, user});
+                    session.transfer(flowFile, REL_FAILURE);
+                    return;
+                }
+
+                final String conflictStrategy = 
context.getProperty(CONFLICT_STRATEGY).getValue();
+
+                if 
(CONFLICT_FAIL.getValue().equalsIgnoreCase(conflictStrategy)) {
+                    final File targetFile = new File(targetDir, 
file.getName());
+                    if (targetFile.exists()) {
+                        getLogger().error("Could not fetch file {} from file 
system for {} because Completion Strategy is configured to move the original 
file to {}, "
+                            + "but a file with name {} already exists in that 
directory and the Move Conflict Strategy is configured for failure",
+                            new Object[] {file, flowFile, targetDir, 
file.getName()});
+                        session.transfer(flowFile, REL_FAILURE);
+                        return;
+                    }
+                }
+            }
+        }
+
+        // import content from file system
+        try (final FileInputStream fis = new FileInputStream(file)) {
+            flowFile = session.importFrom(fis, flowFile);
+        } catch (final IOException ioe) {
+            getLogger().error("Could not fetch file {} from file system for {} 
due to {}; routing to failure", new Object[] {file, flowFile, ioe.toString()}, 
ioe);
+            session.transfer(session.penalize(flowFile), REL_FAILURE);
+            return;
+        }
+
+        session.getProvenanceReporter().modifyContent(flowFile, "Replaced 
content of FlowFile with contents of " + file.toURI(), 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+        session.transfer(flowFile, REL_SUCCESS);
+
+        // It is critical that we commit the session before we perform the 
Completion Strategy. Otherwise, we could have a case where we
+        // ingest the file, delete/move the file, and then NiFi is restarted 
before the session is committed. That would result in data loss.
+        // As long as we commit the session right here, before we perform the 
Completion Strategy, we are safe.
+        session.commit();
+
+        // Attempt to perform the Completion Strategy action
+        Exception completionFailureException = null;
+        if (COMPLETION_DELETE.getValue().equalsIgnoreCase(completionStrategy)) 
{
+            // convert to path and use Files.delete instead of file.delete so 
that if we fail, we know why
+            try {
+                delete(file);
+            } catch (final IOException ioe) {
+                completionFailureException = ioe;
+            }
+        } else if 
(COMPLETION_MOVE.getValue().equalsIgnoreCase(completionStrategy)) {
+            final File targetDirectory = new File(targetDirectoryName);
+            final File targetFile = new File(targetDirectory, file.getName());
+            try {
+                if (targetFile.exists()) {
+                    final String conflictStrategy = 
context.getProperty(CONFLICT_STRATEGY).getValue();
+                    if 
(CONFLICT_KEEP_INTACT.getValue().equalsIgnoreCase(conflictStrategy)) {
+                        // don't move, just delete the original
+                        Files.delete(file.toPath());
+                    } else if 
(CONFLICT_RENAME.getValue().equalsIgnoreCase(conflictStrategy)) {
+                        // rename to add a random UUID but keep the file 
extension if it has one.
+                        final String simpleFilename = targetFile.getName();
+                        final String newName;
+                        if (simpleFilename.contains(".")) {
+                            newName = 
StringUtils.substringBeforeLast(simpleFilename, ".") + "-" + 
UUID.randomUUID().toString() + "." + 
StringUtils.substringAfterLast(simpleFilename, ".");
+                        } else {
+                            newName = simpleFilename + "-" + 
UUID.randomUUID().toString();
+                        }
+
+                        move(file, new File(targetDirectory, newName), false);
+                    } else if 
(CONFLICT_REPLACE.getValue().equalsIgnoreCase(conflictStrategy)) {
+                        move(file, targetFile, true);
+                    }
+                } else {
+                    move(file, targetFile, false);
+                }
+            } catch (final IOException ioe) {
+                completionFailureException = ioe;
+            }
+        }
+
+        // Handle completion failures
+        if (completionFailureException != null) {
+            getLogger().warn("Successfully fetched the content from {} for {} 
but failed to perform Completion Action due to {}; routing to success",
+                new Object[] {file, flowFile, completionFailureException}, 
completionFailureException);
+        }
+    }
+
+
+    //
+    // The following set of methods exist purely for testing purposes
+    //
+    protected void move(final File source, final File target, final boolean 
overwrite) throws IOException {
+        final File targetDirectory = target.getParentFile();
+
+        // convert to path and use Files.move instead of file.renameTo so that 
if we fail, we know why
+        final Path targetPath = target.toPath();
+        if (!targetDirectory.exists()) {
+            Files.createDirectories(targetDirectory.toPath());
+        }
+
+        final CopyOption[] copyOptions = overwrite ? new CopyOption[] 
{StandardCopyOption.REPLACE_EXISTING} : new CopyOption[] {};
+        Files.move(source.toPath(), targetPath, copyOptions);
+    }
+
+    protected void delete(final File file) throws IOException {
+        Files.delete(file.toPath());
+    }
+
+    protected boolean isReadable(final File file) {
+        return file.canRead();
+    }
+
+    protected boolean isWritable(final File file) {
+        return file.canWrite();
+    }
+
+    protected boolean isDirectory(final File file) {
+        return file.isDirectory();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/58384c78/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFile.java
index ced79cd..74b6761 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFile.java
@@ -91,7 +91,7 @@ import org.apache.nifi.processor.util.StandardValidators;
     @WritesAttribute(attribute = "file.permissions", description = "The 
read/write/execute permissions of the file. May not work on all file systems"),
     @WritesAttribute(attribute = "absolute.path", description = "The 
full/absolute path from where a file was picked up. The current 'path' "
             + "attribute is still populated, but may be a relative path")})
-@SeeAlso(PutFile.class)
+@SeeAlso({PutFile.class, FetchFile.class})
 public class GetFile extends AbstractProcessor {
 
     public static final PropertyDescriptor DIRECTORY = new 
PropertyDescriptor.Builder()

http://git-wip-us.apache.org/repos/asf/nifi/blob/58384c78/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java
index 8c4b00f..d32427e 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java
@@ -57,7 +57,7 @@ import org.apache.nifi.util.StopWatch;
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"put", "local", "copy", "archive", "files", "filesystem"})
 @CapabilityDescription("Writes the contents of a FlowFile to the local file 
system")
-@SeeAlso(GetFile.class)
+@SeeAlso({FetchFile.class, GetFile.class})
 public class PutFile extends AbstractProcessor {
 
     public static final String REPLACE_RESOLUTION = "replace";

http://git-wip-us.apache.org/repos/asf/nifi/blob/58384c78/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index eb10431..3572d9b 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -30,6 +30,7 @@ org.apache.nifi.processors.standard.ExecuteStreamCommand
 org.apache.nifi.processors.standard.ExecuteProcess
 org.apache.nifi.processors.standard.ExtractText
 org.apache.nifi.processors.standard.FetchSFTP
+org.apache.nifi.processors.standard.FetchFile
 org.apache.nifi.processors.standard.GenerateFlowFile
 org.apache.nifi.processors.standard.GetFile
 org.apache.nifi.processors.standard.GetFTP

http://git-wip-us.apache.org/repos/asf/nifi/blob/58384c78/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFile.java
new file mode 100644
index 0000000..0e69216
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFile.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.Arrays;
+
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestFetchFile {
+
+    @Before
+    public void prepDestDirectory() throws IOException {
+        final File targetDir = new File("target/move-target");
+        if (!targetDir.exists()) {
+            Files.createDirectories(targetDir.toPath());
+            return;
+        }
+
+        for (final File file : targetDir.listFiles()) {
+            Files.delete(file.toPath());
+        }
+    }
+
+    @Test
+    public void testSimpleSuccess() throws IOException {
+        final File sourceFile = new File("target/1.txt");
+        final byte[] content = "Hello, World!".getBytes();
+        Files.write(sourceFile.toPath(), content, StandardOpenOption.CREATE);
+
+        final TestRunner runner = TestRunners.newTestRunner(new FetchFile());
+        runner.setProperty(FetchFile.FILENAME, sourceFile.getAbsolutePath());
+        runner.setProperty(FetchFile.COMPLETION_STRATEGY, 
FetchFile.COMPLETION_NONE.getValue());
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(FetchFile.REL_SUCCESS).get(0).assertContentEquals(content);
+
+        assertTrue(sourceFile.exists());
+    }
+
+    @Test
+    public void testDeleteOnComplete() throws IOException {
+        final File sourceFile = new File("target/1.txt");
+        final byte[] content = "Hello, World!".getBytes();
+        Files.write(sourceFile.toPath(), content, StandardOpenOption.CREATE);
+
+        final TestRunner runner = TestRunners.newTestRunner(new FetchFile());
+        runner.setProperty(FetchFile.FILENAME, sourceFile.getAbsolutePath());
+        runner.setProperty(FetchFile.COMPLETION_STRATEGY, 
FetchFile.COMPLETION_DELETE.getValue());
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(FetchFile.REL_SUCCESS).get(0).assertContentEquals(content);
+
+        assertFalse(sourceFile.exists());
+    }
+
+    @Test
+    public void testMoveOnCompleteWithTargetDirExisting() throws IOException {
+        final File sourceFile = new File("target/1.txt");
+        final byte[] content = "Hello, World!".getBytes();
+        Files.write(sourceFile.toPath(), content, StandardOpenOption.CREATE);
+
+        final TestRunner runner = TestRunners.newTestRunner(new FetchFile());
+        runner.setProperty(FetchFile.FILENAME, sourceFile.getAbsolutePath());
+        runner.setProperty(FetchFile.COMPLETION_STRATEGY, 
FetchFile.COMPLETION_MOVE.getValue());
+        runner.assertNotValid();
+        runner.setProperty(FetchFile.MOVE_DESTINATION_DIR, 
"target/move-target");
+        runner.assertValid();
+
+        final File destDir = new File("target/move-target");
+        final File destFile = new File(destDir, sourceFile.getName());
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(FetchFile.REL_SUCCESS).get(0).assertContentEquals(content);
+
+        assertFalse(sourceFile.exists());
+        assertTrue(destFile.exists());
+    }
+
+    @Test
+    public void testMoveOnCompleteWithTargetDirMissing() throws IOException {
+        final File sourceFile = new File("target/1.txt");
+        final byte[] content = "Hello, World!".getBytes();
+        Files.write(sourceFile.toPath(), content, StandardOpenOption.CREATE);
+
+        final TestRunner runner = TestRunners.newTestRunner(new FetchFile());
+        runner.setProperty(FetchFile.FILENAME, sourceFile.getAbsolutePath());
+        runner.setProperty(FetchFile.COMPLETION_STRATEGY, 
FetchFile.COMPLETION_MOVE.getValue());
+        runner.assertNotValid();
+        runner.setProperty(FetchFile.MOVE_DESTINATION_DIR, 
"target/move-target");
+        runner.assertValid();
+
+        final File destDir = new File("target/move-target");
+        final File destFile = new File(destDir, sourceFile.getName());
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(FetchFile.REL_SUCCESS).get(0).assertContentEquals(content);
+
+        assertFalse(sourceFile.exists());
+        assertTrue(destFile.exists());
+    }
+
+    @Test
+    public void testMoveAndReplace() throws IOException {
+        final File sourceFile = new File("target/1.txt");
+        final byte[] content = "Hello, World!".getBytes();
+        Files.write(sourceFile.toPath(), content, StandardOpenOption.CREATE);
+
+        final TestRunner runner = TestRunners.newTestRunner(new FetchFile());
+        runner.setProperty(FetchFile.FILENAME, sourceFile.getAbsolutePath());
+        runner.setProperty(FetchFile.COMPLETION_STRATEGY, 
FetchFile.COMPLETION_MOVE.getValue());
+        runner.assertNotValid();
+        runner.setProperty(FetchFile.MOVE_DESTINATION_DIR, 
"target/move-target");
+        runner.setProperty(FetchFile.CONFLICT_STRATEGY, 
FetchFile.CONFLICT_REPLACE.getValue());
+        runner.assertValid();
+
+        final File destDir = new File("target/move-target");
+        final File destFile = new File(destDir, sourceFile.getName());
+        Files.write(destFile.toPath(), "Good-bye".getBytes(), 
StandardOpenOption.CREATE);
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(FetchFile.REL_SUCCESS).get(0).assertContentEquals(content);
+
+        final byte[] replacedContent = Files.readAllBytes(destFile.toPath());
+        assertTrue(Arrays.equals(content, replacedContent));
+        assertFalse(sourceFile.exists());
+        assertTrue(destFile.exists());
+    }
+
+    @Test
+    public void testMoveAndKeep() throws IOException {
+        final File sourceFile = new File("target/1.txt");
+        final byte[] content = "Hello, World!".getBytes();
+        Files.write(sourceFile.toPath(), content, StandardOpenOption.CREATE);
+
+        final TestRunner runner = TestRunners.newTestRunner(new FetchFile());
+        runner.setProperty(FetchFile.FILENAME, sourceFile.getAbsolutePath());
+        runner.setProperty(FetchFile.COMPLETION_STRATEGY, 
FetchFile.COMPLETION_MOVE.getValue());
+        runner.assertNotValid();
+        runner.setProperty(FetchFile.MOVE_DESTINATION_DIR, 
"target/move-target");
+        runner.setProperty(FetchFile.CONFLICT_STRATEGY, 
FetchFile.CONFLICT_KEEP_INTACT.getValue());
+        runner.assertValid();
+
+        final File destDir = new File("target/move-target");
+        final File destFile = new File(destDir, sourceFile.getName());
+
+        final byte[] goodBye = "Good-bye".getBytes();
+        Files.write(destFile.toPath(), goodBye);
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(FetchFile.REL_SUCCESS).get(0).assertContentEquals(content);
+
+        final byte[] replacedContent = Files.readAllBytes(destFile.toPath());
+        assertTrue(Arrays.equals(goodBye, replacedContent));
+        assertFalse(sourceFile.exists());
+        assertTrue(destFile.exists());
+    }
+
+    @Test
+    public void testMoveAndFail() throws IOException {
+        final File sourceFile = new File("target/1.txt");
+        final byte[] content = "Hello, World!".getBytes();
+        Files.write(sourceFile.toPath(), content, StandardOpenOption.CREATE);
+
+        final TestRunner runner = TestRunners.newTestRunner(new FetchFile());
+        runner.setProperty(FetchFile.FILENAME, sourceFile.getAbsolutePath());
+        runner.setProperty(FetchFile.COMPLETION_STRATEGY, 
FetchFile.COMPLETION_MOVE.getValue());
+        runner.assertNotValid();
+        runner.setProperty(FetchFile.MOVE_DESTINATION_DIR, 
"target/move-target");
+        runner.setProperty(FetchFile.CONFLICT_STRATEGY, 
FetchFile.CONFLICT_FAIL.getValue());
+        runner.assertValid();
+
+        final File destDir = new File("target/move-target");
+        final File destFile = new File(destDir, sourceFile.getName());
+
+        final byte[] goodBye = "Good-bye".getBytes();
+        Files.write(destFile.toPath(), goodBye);
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(FetchFile.REL_FAILURE, 1);
+
+        final byte[] replacedContent = Files.readAllBytes(destFile.toPath());
+        assertTrue(Arrays.equals(goodBye, replacedContent));
+        assertTrue(sourceFile.exists());
+        assertTrue(destFile.exists());
+    }
+
+
+    @Test
+    public void testMoveAndRename() throws IOException {
+        final File sourceFile = new File("target/1.txt");
+        final byte[] content = "Hello, World!".getBytes();
+        Files.write(sourceFile.toPath(), content, StandardOpenOption.CREATE);
+
+        final TestRunner runner = TestRunners.newTestRunner(new FetchFile());
+        runner.setProperty(FetchFile.FILENAME, sourceFile.getAbsolutePath());
+        runner.setProperty(FetchFile.COMPLETION_STRATEGY, 
FetchFile.COMPLETION_MOVE.getValue());
+        runner.assertNotValid();
+        runner.setProperty(FetchFile.MOVE_DESTINATION_DIR, 
"target/move-target");
+        runner.setProperty(FetchFile.CONFLICT_STRATEGY, 
FetchFile.CONFLICT_RENAME.getValue());
+        runner.assertValid();
+
+        final File destDir = new File("target/move-target");
+        final File destFile = new File(destDir, sourceFile.getName());
+
+        final byte[] goodBye = "Good-bye".getBytes();
+        Files.write(destFile.toPath(), goodBye);
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1);
+
+        final byte[] replacedContent = Files.readAllBytes(destFile.toPath());
+        assertTrue(Arrays.equals(goodBye, replacedContent));
+        assertFalse(sourceFile.exists());
+        assertTrue(destFile.exists());
+
+        assertEquals(2, destDir.list().length);
+    }
+}

Reply via email to