Repository: nifi Updated Branches: refs/heads/master 50f22162b -> a1706d12f
NIFI-3281 - fix for (S)FTP processors when using EL against FFs NIFI-3281 - Review - handle completePendingCommand return and added a unit test for ListFTP NIFI-3281 - Review - Added flow file for EL evaluation in other methods and added unit test for NIFI-3590 This closes #1974. Signed-off-by: Koji Kawamura <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a1706d12 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a1706d12 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a1706d12 Branch: refs/heads/master Commit: a1706d12f5bbe824c440a375945dd6038baee4cf Parents: 50f2216 Author: Pierre Villard <[email protected]> Authored: Tue Jul 4 10:48:41 2017 +0200 Committer: Koji Kawamura <[email protected]> Committed: Tue Aug 15 11:24:48 2017 +0900 ---------------------------------------------------------------------- .../processors/standard/FetchFileTransfer.java | 10 ++- .../processors/standard/GetFileTransfer.java | 2 +- .../processors/standard/ListFileTransfer.java | 2 +- .../processors/standard/PutFileTransfer.java | 2 +- .../processors/standard/util/FTPTransfer.java | 19 ++++-- .../processors/standard/util/FileTransfer.java | 8 ++- .../processors/standard/util/SFTPTransfer.java | 13 ++-- .../nifi/processors/standard/TestFTP.java | 68 +++++++++++++++++++- .../standard/TestFetchFileTransfer.java | 11 +++- 9 files changed, 111 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/a1706d12/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java index 6182b0a..0023c4b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java @@ -239,9 +239,13 @@ public abstract class FetchFileTransfer extends AbstractProcessor { @Override public void process(final OutputStream out) throws IOException { StreamUtils.copy(in, out); - transfer.flush(); } }); + + if(!transfer.flush(flowFile)) { + throw new IOException("completePendingCommand returned false, file transfer failed"); + } + transferQueue.offer(new FileTransferIdleWrapper(transfer, System.nanoTime())); } catch (final FileNotFoundException e) { getLogger().error("Failed to fetch content for {} from filename {} on remote host {} because the file could not be found on the remote system; routing to {}", @@ -297,7 +301,7 @@ public abstract class FetchFileTransfer extends AbstractProcessor { final String completionStrategy = context.getProperty(COMPLETION_STRATEGY).getValue(); if (COMPLETION_DELETE.getValue().equalsIgnoreCase(completionStrategy)) { try { - transfer.deleteFile(null, filename); + transfer.deleteFile(flowFile, null, filename); } catch (final FileNotFoundException e) { // file doesn't exist -- effectively the same as removing it. Move on. } catch (final IOException ioe) { @@ -313,7 +317,7 @@ public abstract class FetchFileTransfer extends AbstractProcessor { final String target = targetDir + simpleFilename; try { - transfer.rename(filename, target); + transfer.rename(flowFile, filename, target); } catch (final IOException ioe) { getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to rename the remote file due to {}", new Object[] {flowFile, host, port, filename, ioe}, ioe); http://git-wip-us.apache.org/repos/asf/nifi/blob/a1706d12/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java index 00dfccf..4ce31de 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java @@ -208,7 +208,7 @@ public abstract class GetFileTransfer extends AbstractProcessor { if (deleteOriginal) { try { - transfer.deleteFile(null, file.getFullPathFileName()); + transfer.deleteFile(flowFile, null, file.getFullPathFileName()); } catch (final IOException e) { logger.error("Failed to remove remote file {} due to {}; deleting local copy", new Object[]{file.getFullPathFileName(), e}); http://git-wip-us.apache.org/repos/asf/nifi/blob/a1706d12/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java index 58e443a..3f35c4e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java @@ -94,7 +94,7 @@ public abstract class ListFileTransfer extends AbstractListProcessor<FileInfo> { @Override protected String getPath(final ProcessContext context) { - return context.getProperty(REMOTE_PATH).getValue(); + return context.getProperty(REMOTE_PATH).evaluateAttributeExpressions().getValue(); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/a1706d12/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java index 054d1d8..cbaa9ec 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java @@ -230,7 +230,7 @@ public abstract class PutFileTransfer<T extends FileTransfer> extends AbstractPr logger.warn("Resolving conflict by rejecting {} due to conflicting filename with a directory or file already on remote server", new Object[]{flowFile}); break; case FileTransfer.CONFLICT_RESOLUTION_REPLACE: - transfer.deleteFile(path, fileName); + transfer.deleteFile(flowFile, path, fileName); destinationRelationship = REL_SUCCESS; transferFile = true; penalizeFile = false; http://git-wip-us.apache.org/repos/asf/nifi/blob/a1706d12/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java index b64a6f8..aeec6c3 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java @@ -289,7 +289,7 @@ public class FTPTransfer implements FileTransfer { @Override public InputStream getInputStream(final String remoteFileName, final FlowFile flowFile) throws IOException { - final FTPClient client = getClient(null); + final FTPClient client = getClient(flowFile); InputStream in = client.retrieveFileStream(remoteFileName); if (in == null) { throw new IOException(client.getReplyString()); @@ -304,6 +304,11 @@ public class FTPTransfer implements FileTransfer { } @Override + public boolean flush(final FlowFile flowFile) throws IOException { + return getClient(flowFile).completePendingCommand(); + } + + @Override public FileInfo getRemoteFileInfo(final FlowFile flowFile, String path, String remoteFileName) throws IOException { final FTPClient client = getClient(flowFile); @@ -444,8 +449,8 @@ public class FTPTransfer implements FileTransfer { @Override - public void rename(final String source, final String target) throws IOException { - final FTPClient client = getClient(null); + public void rename(final FlowFile flowFile, final String source, final String target) throws IOException { + final FTPClient client = getClient(flowFile); final boolean renameSuccessful = client.rename(source, target); if (!renameSuccessful) { throw new IOException("Failed to rename temporary file " + source + " to " + target + " due to: " + client.getReplyString()); @@ -453,8 +458,8 @@ public class FTPTransfer implements FileTransfer { } @Override - public void deleteFile(final String path, final String remoteFileName) throws IOException { - final FTPClient client = getClient(null); + public void deleteFile(final FlowFile flowFile, final String path, final String remoteFileName) throws IOException { + final FTPClient client = getClient(flowFile); if (path != null) { setWorkingDirectory(path); } @@ -464,8 +469,8 @@ public class FTPTransfer implements FileTransfer { } @Override - public void deleteDirectory(final String remoteDirectoryName) throws IOException { - final FTPClient client = getClient(null); + public void deleteDirectory(final FlowFile flowFile, final String remoteDirectoryName) throws IOException { + final FTPClient client = getClient(flowFile); final boolean success = client.removeDirectory(remoteDirectoryName); if (!success) { throw new IOException("Failed to remove directory " + remoteDirectoryName + " due to " + client.getReplyString()); http://git-wip-us.apache.org/repos/asf/nifi/blob/a1706d12/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java index 22d9ec5..ac7f728 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java @@ -39,15 +39,17 @@ public interface FileTransfer extends Closeable { void flush() throws IOException; + boolean flush(FlowFile flowFile) throws IOException; + FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException; String put(FlowFile flowFile, String path, String filename, InputStream content) throws IOException; - void rename(String source, String target) throws IOException; + void rename(FlowFile flowFile, String source, String target) throws IOException; - void deleteFile(String path, String remoteFileName) throws IOException; + void deleteFile(FlowFile flowFile, String path, String remoteFileName) throws IOException; - void deleteDirectory(String remoteDirectoryName) throws IOException; + void deleteDirectory(FlowFile flowFile, String remoteDirectoryName) throws IOException; boolean isClosed(); http://git-wip-us.apache.org/repos/asf/nifi/blob/a1706d12/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java index a6a9e4b..bc31ba9 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java @@ -309,7 +309,12 @@ public class SFTPTransfer implements FileTransfer { } @Override - public void deleteFile(final String path, final String remoteFileName) throws IOException { + public boolean flush(final FlowFile flowFile) throws IOException { + return true; + } + + @Override + public void deleteFile(final FlowFile flowFile, final String path, final String remoteFileName) throws IOException { final String fullPath = (path == null) ? remoteFileName : (path.endsWith("/")) ? path + remoteFileName : path + "/" + remoteFileName; try { sftp.rm(fullPath); @@ -326,7 +331,7 @@ public class SFTPTransfer implements FileTransfer { } @Override - public void deleteDirectory(final String remoteDirectoryName) throws IOException { + public void deleteDirectory(final FlowFile flowFile, final String remoteDirectoryName) throws IOException { try { sftp.rm(remoteDirectoryName); } catch (final SftpException e) { @@ -613,8 +618,8 @@ public class SFTPTransfer implements FileTransfer { } @Override - public void rename(final String source, final String target) throws IOException { - final ChannelSftp sftp = getChannel(null); + public void rename(final FlowFile flowFile, final String source, final String target) throws IOException { + final ChannelSftp sftp = getChannel(flowFile); try { sftp.rename(source, target); } catch (final SftpException e) { http://git-wip-us.apache.org/repos/asf/nifi/blob/a1706d12/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java index 102931f..d8797dc 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java @@ -36,7 +36,6 @@ import org.mockftpserver.fake.filesystem.FileSystem; import org.mockftpserver.fake.filesystem.WindowsFakeFileSystem; import java.io.FileInputStream; - import java.io.IOException; import java.util.Collection; import java.util.HashMap; @@ -167,4 +166,71 @@ public class TestFTP { final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(GetFTP.REL_SUCCESS).get(0); retrievedFile.assertContentEquals("Just some random test test test chocolate"); } + + @Test + public void basicFileFetch() throws IOException { + FileSystem results = fakeFtpServer.getFileSystem(); + + FileEntry sampleFile = new FileEntry("c:\\data\\randombytes-2"); + sampleFile.setContents("Just some random test test test chocolate"); + results.add(sampleFile); + + // Check file exists + Assert.assertTrue(results.exists("c:\\data\\randombytes-2")); + + TestRunner runner = TestRunners.newTestRunner(FetchFTP.class); + runner.setProperty(FetchFTP.HOSTNAME, "${host}"); + runner.setProperty(FetchFTP.USERNAME, "${username}"); + runner.setProperty(FTPTransfer.PASSWORD, password); + runner.setProperty(FTPTransfer.PORT, "${port}"); + runner.setProperty(FetchFTP.REMOTE_FILENAME, "c:\\data\\randombytes-2"); + runner.setProperty(FetchFTP.COMPLETION_STRATEGY, FetchFTP.COMPLETION_MOVE); + runner.setProperty(FetchFTP.MOVE_DESTINATION_DIR, "data"); + + + Map<String, String> attrs = new HashMap<String, String>(); + attrs.put("host", "localhost"); + attrs.put("username", username); + attrs.put("port", Integer.toString(ftpPort)); + runner.enqueue("", attrs); + + runner.run(); + + final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(FetchFTP.REL_SUCCESS).get(0); + retrievedFile.assertContentEquals("Just some random test test test chocolate"); + } + + @Test + public void basicFileList() throws IOException { + FileSystem results = fakeFtpServer.getFileSystem(); + + FileEntry sampleFile = new FileEntry("c:\\data\\randombytes-2"); + sampleFile.setContents("Just some random test test test chocolate"); + results.add(sampleFile); + + // Check file exists + Assert.assertTrue(results.exists("c:\\data\\randombytes-2")); + + TestRunner runner = TestRunners.newTestRunner(ListFTP.class); + runner.setProperty(ListFTP.HOSTNAME, "localhost"); + runner.setProperty(ListFTP.USERNAME, username); + runner.setProperty(FTPTransfer.PASSWORD, password); + runner.setProperty(FTPTransfer.PORT, Integer.toString(ftpPort)); + runner.setProperty(ListFTP.REMOTE_PATH, "/"); + runner.assertValid(); + + runner.run(); + + final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(FetchFTP.REL_SUCCESS).get(0); + runner.assertAllFlowFilesContainAttribute("ftp.remote.host"); + runner.assertAllFlowFilesContainAttribute("ftp.remote.port"); + runner.assertAllFlowFilesContainAttribute("ftp.listing.user"); + runner.assertAllFlowFilesContainAttribute(ListFile.FILE_OWNER_ATTRIBUTE); + runner.assertAllFlowFilesContainAttribute(ListFile.FILE_GROUP_ATTRIBUTE); + runner.assertAllFlowFilesContainAttribute(ListFile.FILE_PERMISSIONS_ATTRIBUTE); + runner.assertAllFlowFilesContainAttribute(ListFile.FILE_SIZE_ATTRIBUTE); + runner.assertAllFlowFilesContainAttribute(ListFile.FILE_LAST_MODIFY_TIME_ATTRIBUTE); + retrievedFile.assertAttributeEquals("ftp.listing.user", username); + retrievedFile.assertAttributeEquals("filename", "randombytes-2"); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/a1706d12/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java index 2b78a4b..4965893 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java @@ -282,6 +282,11 @@ public class TestFetchFileTransfer { } @Override + public boolean flush(FlowFile flowFile) throws IOException { + return true; + } + + @Override public FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException { return null; } @@ -292,7 +297,7 @@ public class TestFetchFileTransfer { } @Override - public void deleteFile(String path, String remoteFileName) throws IOException { + public void deleteFile(FlowFile flowFile, String path, String remoteFileName) throws IOException { if (!allowDelete) { throw new PermissionDeniedException("test permission denied"); } @@ -305,7 +310,7 @@ public class TestFetchFileTransfer { } @Override - public void rename(String source, String target) throws IOException { + public void rename(FlowFile flowFile, String source, String target) throws IOException { if (!allowRename) { throw new PermissionDeniedException("test permission denied"); } @@ -319,7 +324,7 @@ public class TestFetchFileTransfer { } @Override - public void deleteDirectory(String remoteDirectoryName) throws IOException { + public void deleteDirectory(FlowFile flowFile, String remoteDirectoryName) throws IOException { }
