NIFI-673: Added Completion Strategy to FetchSFTP
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b0322d9f Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b0322d9f Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b0322d9f Branch: refs/heads/NIFI-655 Commit: b0322d9ffe8d117aae4faf7dd3e2881a28940f96 Parents: d1d5793 Author: Mark Payne <[email protected]> Authored: Mon Oct 5 16:11:40 2015 -0400 Committer: Mark Payne <[email protected]> Committed: Sun Oct 25 11:13:02 2015 -0400 ---------------------------------------------------------------------- .../standard/AbstractListProcessor.java | 38 +++--- .../processors/standard/FetchFileTransfer.java | 66 ++++++++-- .../nifi/processors/standard/FetchSFTP.java | 15 ++- .../processors/standard/ListFileTransfer.java | 8 ++ .../nifi/processors/standard/ListSFTP.java | 7 +- .../processors/standard/util/EntityListing.java | 2 +- .../processors/standard/util/FTPTransfer.java | 26 ++-- .../processors/standard/util/FileTransfer.java | 2 + .../processors/standard/util/SFTPTransfer.java | 17 +++ .../standard/TestFetchFileTransfer.java | 131 +++++++++++++++++++ 10 files changed, 265 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/b0322d9f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java index 8a7fade..e592483 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java @@ -70,23 +70,25 @@ import org.codehaus.jackson.map.ObjectMapper; * * <p> * In order to make use of this abstract class, the entities listed must meet the following criteria: - * <ul> - * <li> - * Entity must have a timestamp associated with it. This timestamp is used to determine if entities are "new" or not. Any entity that is - * returned by the listing will be considered "new" if the timestamp is later than the latest timestamp pulled. - * </li> - * <li> - * Entity must have a unique identifier. This is used in conjunction with the timestamp in order to determine whether or not the entity is - * new. If the timestamp of an entity is before the latest timestamp pulled, then the entity is not considered new. If the timestamp is later - * than the last timestamp pulled, then the entity is considered new. If the timestamp is equal to the latest timestamp pulled, then the entity's - * identifier is compared to all of the entity identifiers that have that same timestamp in order to determine whether or not the entity has been - * seen already. - * </li> - * <li> - * Entity must have a user-readable name that can be used for logging purposes. - * </li> * </p> * + * <ul> + * <li> + * Entity must have a timestamp associated with it. This timestamp is used to determine if entities are "new" or not. Any entity that is + * returned by the listing will be considered "new" if the timestamp is later than the latest timestamp pulled. + * </li> + * <li> + * Entity must have a unique identifier. This is used in conjunction with the timestamp in order to determine whether or not the entity is + * new. If the timestamp of an entity is before the latest timestamp pulled, then the entity is not considered new. If the timestamp is later + * than the last timestamp pulled, then the entity is considered new. If the timestamp is equal to the latest timestamp pulled, then the entity's + * identifier is compared to all of the entity identifiers that have that same timestamp in order to determine whether or not the entity has been + * seen already. + * </li> + * <li> + * Entity must have a user-readable name that can be used for logging purposes. + * </li> + * </ul> + * * <p> * This class persists state across restarts so that even if NiFi is restarted, duplicates will not be pulled from the remote system. This is performed using * two different mechanisms. First, state is stored locally. This allows the system to be restarted and begin processing where it left off. The state that is @@ -111,6 +113,7 @@ import org.codehaus.jackson.map.ObjectMapper; * * <p> * Subclasses are responsible for the following: + * </p> * * <ul> * <li> @@ -134,7 +137,6 @@ import org.codehaus.jackson.map.ObjectMapper; * a boolean indicating whether or not a change in the value of the provided property should trigger the timestamp and identifier information to be cleared. * </li> * </ul> - * </p> */ @TriggerSerially public abstract class AbstractListProcessor<T extends ListableEntity> extends AbstractProcessor { @@ -372,8 +374,8 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab int listCount = 0; Long latestListingTimestamp = null; for (final T entity : entityList) { - final boolean list = (minTimestamp == null || entity.getTimestamp() > minTimestamp || - (entity.getTimestamp() == minTimestamp && !latestIdentifiersListed.contains(entity.getIdentifier()))); + final boolean list = (minTimestamp == null || entity.getTimestamp() > minTimestamp + || (entity.getTimestamp() == minTimestamp && !latestIdentifiersListed.contains(entity.getIdentifier()))); // Create the FlowFile for this path. if (list) { http://git-wip-us.apache.org/repos/asf/nifi/blob/b0322d9f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java index 5eecac3..ab0be78 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java @@ -31,7 +31,9 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -49,9 +51,18 @@ import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.Tuple; /** - * A base class for FetchSFTP, FetchFTP processors + * A base class for FetchSFTP, FetchFTP processors. + * + * Note that implementations of this class should never use the @SupportsBatching annotation! Doing so + * could result in data loss! */ public abstract class FetchFileTransfer extends AbstractProcessor { + + static final AllowableValue COMPLETION_NONE = new AllowableValue("None", "None", "Leave the file as-is"); + static final AllowableValue COMPLETION_MOVE = new AllowableValue("Move File", "Move File", "Move the file to the directory specified by the <Move Destination Directory> property"); + static final AllowableValue COMPLETION_DELETE = new AllowableValue("Delete File", "Delete File", "Deletes the original file from the remote system"); + + static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() .name("Hostname") .description("The fully-qualified hostname or IP address of the host to fetch the data from") @@ -73,13 +84,25 @@ public abstract class FetchFileTransfer extends AbstractProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(true) .build(); - public static final PropertyDescriptor DELETE_ORIGINAL = new PropertyDescriptor.Builder() - .name("Delete Original") - .description("Determines whether or not the file is deleted from the remote system after it has been successfully transferred") - .defaultValue("true") - .allowableValues("true", "false") + static final PropertyDescriptor COMPLETION_STRATEGY = new PropertyDescriptor.Builder() + .name("Completion Strategy") + .description("Specifies what to do with the original file on the server once it has been pulled into NiFi. If the Completion Strategy fails, a warning will be " + + "logged but the data will still be transferred.") + .expressionLanguageSupported(false) + .allowableValues(COMPLETION_NONE, COMPLETION_MOVE, COMPLETION_DELETE) + .defaultValue(COMPLETION_NONE.getValue()) .required(true) .build(); + static final PropertyDescriptor MOVE_DESTINATION_DIR = new PropertyDescriptor.Builder() + .name("Move Destination Directory") + .description("The directory on the remote server to the move the original file to once it has been ingested into NiFi. " + + "This property is ignored unless the Completion Strategy is set to \"Move File\". The specified directory must already exist on" + + "the remote system, or the rename will fail.") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -156,7 +179,8 @@ public abstract class FetchFileTransfer extends AbstractProcessor { properties.add(HOSTNAME); properties.add(UNDEFAULTED_PORT); properties.add(REMOTE_FILENAME); - properties.add(DELETE_ORIGINAL); + properties.add(COMPLETION_STRATEGY); + properties.add(MOVE_DESTINATION_DIR); return properties; } @@ -203,6 +227,7 @@ public abstract class FetchFileTransfer extends AbstractProcessor { final InputStream in; try { in = transfer.getInputStream(filename, flowFile); + flowFile = session.write(flowFile, new OutputStreamCallback() { @Override public void process(final OutputStream out) throws IOException { @@ -250,15 +275,34 @@ public abstract class FetchFileTransfer extends AbstractProcessor { stopWatch.getElapsed(TimeUnit.MILLISECONDS)); session.transfer(flowFile, REL_SUCCESS); - // delete remote file is necessary - final boolean deleteOriginal = context.getProperty(DELETE_ORIGINAL).asBoolean(); - if (deleteOriginal) { + // it is critical that we commit the session before moving/deleting the remote file. Otherwise, we could have a situation where + // we ingest the data, delete/move the remote file, and then NiFi dies/is shut down before the session is committed. This would + // result in data loss! If we commit the session first, we are safe. + session.commit(); + + final String completionStrategy = context.getProperty(COMPLETION_STRATEGY).getValue(); + if (COMPLETION_DELETE.getValue().equalsIgnoreCase(completionStrategy)) { try { transfer.deleteFile(null, filename); } catch (final FileNotFoundException e) { // file doesn't exist -- effectively the same as removing it. Move on. } catch (final IOException ioe) { - getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to remove the remote file due to {}", new Object[] {flowFile, host, port, filename, ioe}, ioe); + getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to remove the remote file due to {}", + new Object[] {flowFile, host, port, filename, ioe}, ioe); + } + } else if (COMPLETION_MOVE.getValue().equalsIgnoreCase(completionStrategy)) { + String targetDir = context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue(); + if (!targetDir.endsWith("/")) { + targetDir = targetDir + "/"; + } + final String simpleFilename = StringUtils.substringAfterLast(filename, "/"); + final String target = targetDir + simpleFilename; + + try { + transfer.rename(filename, target); + } catch (final IOException ioe) { + getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to rename the remote file due to {}", + new Object[] {flowFile, host, port, filename, ioe}, ioe); } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/b0322d9f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java index 6387e19..ad81c83 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java @@ -22,7 +22,6 @@ import java.util.Collection; import java.util.Collections; import java.util.List; -import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -35,8 +34,7 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processors.standard.util.FileTransfer; import org.apache.nifi.processors.standard.util.SFTPTransfer; - -@SupportsBatching +// Note that we do not use @SupportsBatching annotation. This processor cannot support batching because it must ensure that session commits happen before remote files are deleted. @Tags({"sftp", "get", "retrieve", "files", "fetch", "remote", "ingest", "source", "input"}) @CapabilityDescription("Fetches the content of a file from a remote SFTP server and overwrites the contents of an incoming FlowFile with the content of the remote file.") @SeeAlso({GetSFTP.class, PutSFTP.class, GetFTP.class, PutFTP.class}) @@ -50,15 +48,18 @@ public class FetchSFTP extends FetchFileTransfer { @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final PropertyDescriptor port = new PropertyDescriptor.Builder().fromPropertyDescriptor(UNDEFAULTED_PORT).defaultValue("22").build(); + final List<PropertyDescriptor> properties = new ArrayList<>(); - properties.add(FetchFileTransfer.HOSTNAME); - properties.add(SFTPTransfer.PORT); + properties.add(HOSTNAME); + properties.add(port); properties.add(SFTPTransfer.USERNAME); properties.add(SFTPTransfer.PASSWORD); properties.add(SFTPTransfer.PRIVATE_KEY_PATH); properties.add(SFTPTransfer.PRIVATE_KEY_PASSPHRASE); - properties.add(FetchFileTransfer.REMOTE_FILENAME); - properties.add(SFTPTransfer.DELETE_ORIGINAL); + properties.add(REMOTE_FILENAME); + properties.add(COMPLETION_STRATEGY); + properties.add(MOVE_DESTINATION_DIR); properties.add(SFTPTransfer.CONNECTION_TIMEOUT); properties.add(SFTPTransfer.DATA_TIMEOUT); properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT); http://git-wip-us.apache.org/repos/asf/nifi/blob/b0322d9f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java index d6e1cd1..1176fd0 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java @@ -38,6 +38,13 @@ public abstract class ListFileTransfer extends AbstractListProcessor<FileInfo> { .required(true) .expressionLanguageSupported(true) .build(); + static final PropertyDescriptor UNDEFAULTED_PORT = new PropertyDescriptor.Builder() + .name("Port") + .description("The port to connect to on the remote host to fetch the data from") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .required(true) + .build(); public static final PropertyDescriptor REMOTE_PATH = new PropertyDescriptor.Builder() .name("Remote Path") .description("The path on the remote system from which to pull or push files") @@ -52,6 +59,7 @@ public abstract class ListFileTransfer extends AbstractListProcessor<FileInfo> { protected Map<String, String> createAttributes(final FileInfo fileInfo, final ProcessContext context) { final Map<String, String> attributes = new HashMap<>(); attributes.put(getProtocolName() + ".remote.host", context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue()); + attributes.put(getProtocolName() + ".remote.port", context.getProperty(UNDEFAULTED_PORT).evaluateAttributeExpressions().getValue()); attributes.put("file.owner", fileInfo.getOwner()); attributes.put("file.group", fileInfo.getGroup()); attributes.put("file.permissions", fileInfo.getPermissions()); http://git-wip-us.apache.org/repos/asf/nifi/blob/b0322d9f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java index 3b6b69e..925e5f8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java @@ -38,6 +38,7 @@ import org.apache.nifi.processors.standard.util.SFTPTransfer; @SeeAlso({FetchSFTP.class, GetSFTP.class, PutSFTP.class}) @WritesAttributes({ @WritesAttribute(attribute = "sftp.remote.host", description = "The hostname of the SFTP Server"), + @WritesAttribute(attribute = "sftp.remote.port", description = "The port that was connected to on the SFTP Server"), @WritesAttribute(attribute = "file.owner", description = "The numeric owner id of the source file"), @WritesAttribute(attribute = "file.group", description = "The numeric group id of the source file"), @WritesAttribute(attribute = "file.permissions", description = "The read/write/execute permissions of the source file"), @@ -48,9 +49,11 @@ public class ListSFTP extends ListFileTransfer { @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final PropertyDescriptor port = new PropertyDescriptor.Builder().fromPropertyDescriptor(UNDEFAULTED_PORT).defaultValue("22").build(); + final List<PropertyDescriptor> properties = new ArrayList<>(); - properties.add(SFTPTransfer.HOSTNAME); - properties.add(SFTPTransfer.PORT); + properties.add(HOSTNAME); + properties.add(port); properties.add(SFTPTransfer.USERNAME); properties.add(SFTPTransfer.PASSWORD); properties.add(SFTPTransfer.PRIVATE_KEY_PATH); http://git-wip-us.apache.org/repos/asf/nifi/blob/b0322d9f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/EntityListing.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/EntityListing.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/EntityListing.java index 56489f0..2d9525f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/EntityListing.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/EntityListing.java @@ -61,7 +61,7 @@ public class EntityListing { /** * Sets the Collection containing the identifiers of all entities in the listing whose Timestamp was * equal to {@link #getLatestTimestamp()} - * + * * @param matchingIdentifiers the identifiers that have last modified date matching the latest timestamp */ public void setMatchingIdentifiers(Collection<String> matchingIdentifiers) { http://git-wip-us.apache.org/repos/asf/nifi/blob/b0322d9f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java index 7f659d4..a038eb7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java @@ -135,7 +135,7 @@ public class FTPTransfer implements FileTransfer { client.disconnect(); } } catch (final Exception ex) { - logger.warn("Failed to close FTPClient due to {}", new Object[] { ex.toString() }, ex); + logger.warn("Failed to close FTPClient due to {}", new Object[] {ex.toString()}, ex); } client = null; } @@ -334,9 +334,9 @@ public class FTPTransfer implements FileTransfer { final boolean cdSuccessful = setWorkingDirectory(remoteDirectory); if (!cdSuccessful) { - logger.debug("Remote Directory {} does not exist; creating it", new Object[] { remoteDirectory }); + logger.debug("Remote Directory {} does not exist; creating it", new Object[] {remoteDirectory}); if (client.makeDirectory(remoteDirectory)) { - logger.debug("Created {}", new Object[] { remoteDirectory }); + logger.debug("Created {}", new Object[] {remoteDirectory}); } else { throw new IOException("Failed to create remote directory " + remoteDirectory); } @@ -392,10 +392,10 @@ public class FTPTransfer implements FileTransfer { final String time = outformat.format(fileModifyTime); if (!client.setModificationTime(tempFilename, time)) { // FTP server probably doesn't support MFMT command - logger.warn("Could not set lastModifiedTime on {} to {}", new Object[] { flowFile, lastModifiedTime }); + logger.warn("Could not set lastModifiedTime on {} to {}", new Object[] {flowFile, lastModifiedTime}); } } catch (final Exception e) { - logger.error("Failed to set lastModifiedTime on {} to {} due to {}", new Object[] { flowFile, lastModifiedTime, e }); + logger.error("Failed to set lastModifiedTime on {} to {} due to {}", new Object[] {flowFile, lastModifiedTime, e}); } } final String permissions = ctx.getProperty(PERMISSIONS).evaluateAttributeExpressions(flowFile).getValue(); @@ -404,17 +404,17 @@ public class FTPTransfer implements FileTransfer { int perms = numberPermissions(permissions); if (perms >= 0) { if (!client.sendSiteCommand("chmod " + Integer.toOctalString(perms) + " " + tempFilename)) { - logger.warn("Could not set permission on {} to {}", new Object[] { flowFile, permissions }); + logger.warn("Could not set permission on {} to {}", new Object[] {flowFile, permissions}); } } } catch (final Exception e) { - logger.error("Failed to set permission on {} to {} due to {}", new Object[] { flowFile, permissions, e }); + logger.error("Failed to set permission on {} to {} due to {}", new Object[] {flowFile, permissions, e}); } } if (!filename.equals(tempFilename)) { try { - logger.debug("Renaming remote path from {} to {} for {}", new Object[] { tempFilename, filename, flowFile }); + logger.debug("Renaming remote path from {} to {} for {}", new Object[] {tempFilename, filename, flowFile}); final boolean renameSuccessful = client.rename(tempFilename, filename); if (!renameSuccessful) { throw new IOException("Failed to rename temporary file " + tempFilename + " to " + fullPath + " due to: " + client.getReplyString()); @@ -432,6 +432,16 @@ public class FTPTransfer implements FileTransfer { return fullPath; } + + @Override + public void rename(final String source, final String target) throws IOException { + final FTPClient client = getClient(null); + final boolean renameSuccessful = client.rename(source, target); + if (!renameSuccessful) { + throw new IOException("Failed to rename temporary file " + source + " to " + target + " due to: " + client.getReplyString()); + } + } + @Override public void deleteFile(final String path, final String remoteFileName) throws IOException { final FTPClient client = getClient(null); http://git-wip-us.apache.org/repos/asf/nifi/blob/b0322d9f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java index fe277df..8d48de2 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java @@ -43,6 +43,8 @@ public interface FileTransfer extends Closeable { String put(FlowFile flowFile, String path, String filename, InputStream content) throws IOException; + void rename(String source, String target) throws IOException; + void deleteFile(String path, String remoteFileName) throws IOException; void deleteDirectory(String remoteDirectoryName) throws IOException; http://git-wip-us.apache.org/repos/asf/nifi/blob/b0322d9f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java index c28f275..9bad520 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java @@ -603,6 +603,23 @@ public class SFTPTransfer implements FileTransfer { return fullPath; } + @Override + public void rename(final String source, final String target) throws IOException { + final ChannelSftp sftp = getChannel(null); + try { + sftp.rename(source, target); + } catch (final SftpException e) { + switch (e.id) { + case ChannelSftp.SSH_FX_NO_SUCH_FILE: + throw new FileNotFoundException(); + case ChannelSftp.SSH_FX_PERMISSION_DENIED: + throw new PermissionDeniedException("Could not rename remote file " + source + " to " + target + " due to insufficient permissions"); + default: + throw new IOException(e); + } + } + } + protected int numberPermissions(String perms) { int number = -1; final Pattern rwxPattern = Pattern.compile("^[rwx-]{9}$"); http://git-wip-us.apache.org/repos/asf/nifi/blob/b0322d9f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java index 7aa8f9c..4175a77 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java @@ -17,7 +17,9 @@ package org.apache.nifi.processors.standard; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.io.ByteArrayInputStream; import java.io.File; @@ -92,8 +94,119 @@ public class TestFetchFileTransfer { runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED, 1); } + + @Test + public void testMoveFileWithNoTrailingSlashDirName() { + final TestableFetchFileTransfer proc = new TestableFetchFileTransfer(); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost"); + runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11"); + runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}"); + runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue()); + runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved"); + + proc.addContent("hello.txt", "world".getBytes()); + final Map<String, String> attrs = new HashMap<>(); + attrs.put("filename", "hello.txt"); + runner.enqueue(new byte[0], attrs); + + runner.run(1, false, false); + runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1); + + proc.fileContents.containsKey("/moved/hello.txt"); + assertEquals(1, proc.fileContents.size()); + } + + @Test + public void testMoveFileWithTrailingSlashDirName() { + final TestableFetchFileTransfer proc = new TestableFetchFileTransfer(); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost"); + runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11"); + runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}"); + runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue()); + runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved/"); + + proc.addContent("hello.txt", "world".getBytes()); + final Map<String, String> attrs = new HashMap<>(); + attrs.put("filename", "hello.txt"); + runner.enqueue(new byte[0], attrs); + + runner.run(1, false, false); + runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1); + + proc.fileContents.containsKey("/moved/hello.txt"); + assertEquals(1, proc.fileContents.size()); + } + + @Test + public void testDeleteFile() { + final TestableFetchFileTransfer proc = new TestableFetchFileTransfer(); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost"); + runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11"); + runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}"); + runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_DELETE.getValue()); + + proc.addContent("hello.txt", "world".getBytes()); + final Map<String, String> attrs = new HashMap<>(); + attrs.put("filename", "hello.txt"); + runner.enqueue(new byte[0], attrs); + + runner.run(1, false, false); + runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1); + assertTrue(proc.fileContents.isEmpty()); + } + + @Test + public void testDeleteFails() { + final TestableFetchFileTransfer proc = new TestableFetchFileTransfer(); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost"); + runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11"); + runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}"); + runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_DELETE.getValue()); + + proc.addContent("hello.txt", "world".getBytes()); + final Map<String, String> attrs = new HashMap<>(); + attrs.put("filename", "hello.txt"); + runner.enqueue(new byte[0], attrs); + proc.allowDelete = false; + + runner.run(1, false, false); + runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1); + assertFalse(proc.fileContents.isEmpty()); + } + + @Test + public void testRenameFails() { + final TestableFetchFileTransfer proc = new TestableFetchFileTransfer(); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost"); + runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11"); + runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}"); + runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue()); + runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved/"); + + proc.addContent("hello.txt", "world".getBytes()); + final Map<String, String> attrs = new HashMap<>(); + attrs.put("filename", "hello.txt"); + runner.enqueue(new byte[0], attrs); + proc.allowDelete = false; + proc.allowRename = false; + + runner.run(1, false, false); + runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1); + assertEquals(1, proc.fileContents.size()); + + assertTrue(proc.fileContents.containsKey("hello.txt")); + } + + private static class TestableFetchFileTransfer extends FetchFileTransfer { private boolean allowAccess = true; + private boolean allowDelete = true; + private boolean allowRename = true; private boolean closed = false; private final Map<String, byte[]> fileContents = new HashMap<>(); @@ -154,6 +267,10 @@ public class TestFetchFileTransfer { @Override public void deleteFile(String path, String remoteFileName) throws IOException { + if (!allowDelete) { + throw new PermissionDeniedException("test permission denied"); + } + if (!fileContents.containsKey(remoteFileName)) { throw new FileNotFoundException(); } @@ -162,6 +279,20 @@ public class TestFetchFileTransfer { } @Override + public void rename(String source, String target) throws IOException { + if (!allowRename) { + throw new PermissionDeniedException("test permission denied"); + } + + if (!fileContents.containsKey(source)) { + throw new FileNotFoundException(); + } + + final byte[] content = fileContents.remove(source); + fileContents.put(target, content); + } + + @Override public void deleteDirectory(String remoteDirectoryName) throws IOException { }
