NIFI-673: Initial implementation of ListSFTP, FetchSFTP
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d1d57931 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d1d57931 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d1d57931 Branch: refs/heads/NIFI-730 Commit: d1d57931bf996a230ab7941cb6c1524286c97606 Parents: 8a80060 Author: Mark Payne <[email protected]> Authored: Sun Oct 4 15:48:28 2015 -0400 Committer: Mark Payne <[email protected]> Committed: Sun Oct 25 11:13:02 2015 -0400 ---------------------------------------------------------------------- .../standard/AbstractListProcessor.java | 505 +++++++++++++++++++ .../processors/standard/FetchFileTransfer.java | 296 +++++++++++ .../nifi/processors/standard/FetchSFTP.java | 89 ++++ .../processors/standard/ListFileTransfer.java | 103 ++++ .../nifi/processors/standard/ListSFTP.java | 81 +++ .../processors/standard/util/EntityListing.java | 71 +++ .../processors/standard/util/FTPTransfer.java | 135 ++--- .../nifi/processors/standard/util/FileInfo.java | 18 +- .../processors/standard/util/FileTransfer.java | 335 ++++++------ .../standard/util/ListableEntity.java | 40 ++ .../util/PermissionDeniedException.java | 32 ++ .../processors/standard/util/SFTPTransfer.java | 174 ++++--- .../org.apache.nifi.processor.Processor | 2 + .../standard/TestAbstractListProcessor.java | 221 ++++++++ .../standard/TestFetchFileTransfer.java | 186 +++++++ 15 files changed, 1988 insertions(+), 300 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java new file mode 100644 index 0000000..8a7fade --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java @@ -0,0 +1,505 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; +import org.apache.nifi.annotation.notification.PrimaryNodeState; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.distributed.cache.client.exception.DeserializationException; +import org.apache.nifi.distributed.cache.client.exception.SerializationException; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.standard.util.EntityListing; +import org.apache.nifi.processors.standard.util.ListableEntity; +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; + +/** + * <p> + * An Abstract Processor that is intended to simplify the coding required in order to perform Listing operations of remote resources. + * Those remote resources may be files, "objects", "messages", or any other sort of entity that may need to be listed in such a way that + * we identity the entity only once. Each of these objects, messages, etc. is referred to as an "entity" for the scope of this Processor. + * </p> + * + * <p> + * This class is responsible for triggering the listing to occur, filtering the results returned such that only new (unlisted) entities + * or entities that have been modified will be emitted from the Processor. + * </p> + * + * <p> + * In order to make use of this abstract class, the entities listed must meet the following criteria: + * <ul> + * <li> + * Entity must have a timestamp associated with it. This timestamp is used to determine if entities are "new" or not. Any entity that is + * returned by the listing will be considered "new" if the timestamp is later than the latest timestamp pulled. + * </li> + * <li> + * Entity must have a unique identifier. This is used in conjunction with the timestamp in order to determine whether or not the entity is + * new. If the timestamp of an entity is before the latest timestamp pulled, then the entity is not considered new. If the timestamp is later + * than the last timestamp pulled, then the entity is considered new. If the timestamp is equal to the latest timestamp pulled, then the entity's + * identifier is compared to all of the entity identifiers that have that same timestamp in order to determine whether or not the entity has been + * seen already. + * </li> + * <li> + * Entity must have a user-readable name that can be used for logging purposes. + * </li> + * </p> + * + * <p> + * This class persists state across restarts so that even if NiFi is restarted, duplicates will not be pulled from the remote system. This is performed using + * two different mechanisms. First, state is stored locally. This allows the system to be restarted and begin processing where it left off. The state that is + * stored is the latest timestamp that has been pulled (as determined by the timestamps of the entities that are returned), as well as the unique identifier of + * each entity that has that timestamp. See the section above for information about how these pieces of information are used in order to determine entity uniqueness. + * </p> + * + * <p> + * In addition to storing state locally, the Processor exposes an optional <code>Distributed Cache Service</code> property. In standalone deployment of NiFi, this is + * not necessary. However, in a clustered environment, subclasses of this class are expected to be run only on primary node. While this means that the local state is + * accurate as long as the primary node remains constant, the primary node in the cluster can be changed. As a result, if running in a clustered environment, it is + * recommended that this property be set. This allows the same state that is described above to also be replicated across the cluster. If this property is set, then + * on restart the Processor will not begin listing until it has retrieved an updated state from this service, as it does not know whether or not another node has + * modified the state in the mean time. + * </p> + * + * <p> + * For each new entity that is listed, the Processor will send a FlowFile to the 'success' relationship. The FlowFile will have no content but will have some set + * of attributes (defined by the concrete implementation) that can be used to fetch those remote resources or interact with them in whatever way makes sense for + * the configured dataflow. + * </p> + * + * <p> + * Subclasses are responsible for the following: + * + * <ul> + * <li> + * Perform a listing of remote resources. The subclass will implement the {@link #performListing(ProcessContext, Long)} method, which creates a listing of all + * entities on the remote system that have timestamps later than the provided timestamp. If the entities returned have a timestamp before the provided one, those + * entities will be filtered out. It is therefore not necessary to perform the filtering of timestamps but is provided in order to give the implementation the ability + * to filter those resources on the server side rather than pulling back all of the information, if it makes sense to do so in the concrete implementation. + * </li> + * <li> + * Creating a Map of attributes that are applicable for an entity. The attributes that are assigned to each FlowFile are exactly those returned by the + * {@link #createAttributes(ListableEntity, ProcessContext)}. + * </li> + * <li> + * Returning the configured path. Many resources can be comprised of a "path" (or a "container" or "bucket", etc.) as well as name or identifier that is unique only + * within that path. The {@link #getPath(ProcessContext)} method is responsible for returning the path that is currently being polled for entities. If this does concept + * does not apply for the concrete implementation, it is recommended that the concrete implementation return "." or "/" for all invocations of this method. + * </li> + * <li> + * Determining when the listing must be cleared. It is sometimes necessary to clear state about which entities have already been ingested, as the result of a user + * changing a property value. The {@link #isListingResetNecessary(PropertyDescriptor)} method is responsible for determining when the listing needs to be reset by returning + * a boolean indicating whether or not a change in the value of the provided property should trigger the timestamp and identifier information to be cleared. + * </li> + * </ul> + * </p> + */ +@TriggerSerially +public abstract class AbstractListProcessor<T extends ListableEntity> extends AbstractProcessor { + public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder() + .name("Distributed Cache Service") + .description("Specifies the Controller Service that should be used to maintain state about what has been pulled from the remote server so that if a new node " + + "begins pulling data, it won't duplicate all of the work that has been done. If not specified, the information will not be shared across the cluster. " + + "This property does not need to be set for standalone instances of NiFi but should be configured if NiFi is run within a cluster.") + .required(false) + .identifiesControllerService(DistributedMapCacheClient.class) + .build(); + + + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All FlowFiles that are received are routed to success") + .build(); + + + private volatile Long lastListingTime = null; + private volatile Set<String> latestIdentifiersListed = new HashSet<>(); + private volatile boolean electedPrimaryNode = false; + + protected File getPersistenceFile() { + return new File("conf/state/" + getIdentifier()); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(DISTRIBUTED_CACHE_SERVICE); + return properties; + } + + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + if (isListingResetNecessary(descriptor)) { + lastListingTime = null; // clear lastListingTime so that we have to fetch new time + latestIdentifiersListed = new HashSet<>(); + } + } + + @Override + public Set<Relationship> getRelationships() { + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + return relationships; + } + + protected String getKey(final String directory) { + return getIdentifier() + ".lastListingTime." + directory; + } + + @OnPrimaryNodeStateChange + public void onPrimaryNodeChange(final PrimaryNodeState newState) { + if (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE) { + electedPrimaryNode = true; + } + } + + private EntityListing deserialize(final String serializedState) throws JsonParseException, JsonMappingException, IOException { + final ObjectMapper mapper = new ObjectMapper(); + final JsonNode jsonNode = mapper.readTree(serializedState); + return mapper.readValue(jsonNode, EntityListing.class); + } + + + private Long getMinTimestamp(final String directory, final DistributedMapCacheClient client) throws IOException { + // Determine the timestamp for the last file that we've listed. + Long minTimestamp = lastListingTime; + if (minTimestamp == null || electedPrimaryNode) { + // We haven't yet restored any state from local or distributed state - or it's been at least a minute since + // we have performed a listing. In this case, + // First, attempt to get timestamp from distributed cache service. + if (client != null) { + try { + final StringSerDe serde = new StringSerDe(); + final String serializedState = client.get(getKey(directory), serde, serde); + if (serializedState == null || serializedState.isEmpty()) { + minTimestamp = null; + this.latestIdentifiersListed = Collections.emptySet(); + } else { + final EntityListing listing = deserialize(serializedState); + this.lastListingTime = listing.getLatestTimestamp().getTime(); + minTimestamp = listing.getLatestTimestamp().getTime(); + this.latestIdentifiersListed = new HashSet<>(listing.getMatchingIdentifiers()); + } + + this.lastListingTime = minTimestamp; + electedPrimaryNode = false; // no requirement to pull an update from the distributed cache anymore. + } catch (final IOException ioe) { + throw ioe; + } + } + + // Check the persistence file. We want to use the latest timestamp that we have so that + // we don't duplicate data. + try { + final File persistenceFile = getPersistenceFile(); + if (persistenceFile.exists()) { + try (final FileInputStream fis = new FileInputStream(persistenceFile)) { + final Properties props = new Properties(); + props.load(fis); + + // get the local timestamp for this directory, if it exists. + final String locallyPersistedValue = props.getProperty(directory); + if (locallyPersistedValue != null) { + final EntityListing listing = deserialize(locallyPersistedValue); + final long localTimestamp = listing.getLatestTimestamp().getTime(); + + // If distributed state doesn't have an entry or the local entry is later than the distributed state, + // update the distributed state so that we are in sync. + if (client != null && (minTimestamp == null || localTimestamp > minTimestamp)) { + minTimestamp = localTimestamp; + + // Our local persistence file shows a later time than the Distributed service. + // Update the distributed service to match our local state. + try { + final StringSerDe serde = new StringSerDe(); + client.put(getKey(directory), locallyPersistedValue, serde, serde); + } catch (final IOException ioe) { + getLogger().warn("Local timestamp for {} is {}, which is later than Distributed state but failed to update Distributed " + + "state due to {}. If a new node performs Listing, data duplication may occur", + new Object[] {directory, locallyPersistedValue, ioe}); + } + } + } + } + } + } catch (final IOException ioe) { + getLogger().warn("Failed to recover local state due to {}. Assuming that the state from the distributed cache is correct.", ioe); + } + } + + return minTimestamp; + } + + + private String serializeState(final List<T> entities) throws JsonGenerationException, JsonMappingException, IOException { + // we need to keep track of all files that we pulled in that had a modification time equal to + // lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files + // that have a mod time equal to that timestamp because more files may come in with the same timestamp + // later in the same millisecond. + if (entities.isEmpty()) { + return null; + } else { + final List<T> sortedEntities = new ArrayList<>(entities); + Collections.sort(sortedEntities, new Comparator<ListableEntity>() { + @Override + public int compare(final ListableEntity o1, final ListableEntity o2) { + return Long.compare(o1.getTimestamp(), o2.getTimestamp()); + } + }); + + final long latestListingModTime = sortedEntities.get(sortedEntities.size() - 1).getTimestamp(); + final Set<String> idsWithTimestampEqualToListingTime = new HashSet<>(); + for (int i = sortedEntities.size() - 1; i >= 0; i--) { + final ListableEntity entity = sortedEntities.get(i); + if (entity.getTimestamp() == latestListingModTime) { + idsWithTimestampEqualToListingTime.add(entity.getIdentifier()); + } + } + + this.latestIdentifiersListed = idsWithTimestampEqualToListingTime; + + final EntityListing listing = new EntityListing(); + listing.setLatestTimestamp(new Date(latestListingModTime)); + final Set<String> ids = new HashSet<>(); + for (final String id : idsWithTimestampEqualToListingTime) { + ids.add(id); + } + listing.setMatchingIdentifiers(ids); + + final ObjectMapper mapper = new ObjectMapper(); + final String serializedState = mapper.writerWithType(EntityListing.class).writeValueAsString(listing); + return serializedState; + } + } + + protected void persistLocalState(final String path, final String serializedState) throws IOException { + // we need to keep track of all files that we pulled in that had a modification time equal to + // lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files + // that have a mod time equal to that timestamp because more files may come in with the same timestamp + // later in the same millisecond. + final File persistenceFile = getPersistenceFile(); + final File dir = persistenceFile.getParentFile(); + if (!dir.exists() && !dir.mkdirs()) { + throw new IOException("Could not create directory " + dir.getAbsolutePath() + " in order to save local state"); + } + + final Properties props = new Properties(); + if (persistenceFile.exists()) { + try (final FileInputStream fis = new FileInputStream(persistenceFile)) { + props.load(fis); + } + } + + props.setProperty(path, serializedState); + + try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) { + props.store(fos, null); + } + } + + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final String path = getPath(context); + final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class); + + final Long minTimestamp; + try { + minTimestamp = getMinTimestamp(path, client); + } catch (final IOException ioe) { + getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished."); + context.yield(); + return; + } + + final List<T> entityList; + try { + entityList = performListing(context, minTimestamp); + } catch (final IOException e) { + getLogger().error("Failed to perform listing on remote host due to {}", e); + context.yield(); + return; + } + + if (entityList == null) { + context.yield(); + return; + } + + int listCount = 0; + Long latestListingTimestamp = null; + for (final T entity : entityList) { + final boolean list = (minTimestamp == null || entity.getTimestamp() > minTimestamp || + (entity.getTimestamp() == minTimestamp && !latestIdentifiersListed.contains(entity.getIdentifier()))); + + // Create the FlowFile for this path. + if (list) { + final Map<String, String> attributes = createAttributes(entity, context); + FlowFile flowFile = session.create(); + flowFile = session.putAllAttributes(flowFile, attributes); + session.transfer(flowFile, REL_SUCCESS); + listCount++; + + if (latestListingTimestamp == null || entity.getTimestamp() > latestListingTimestamp) { + latestListingTimestamp = entity.getTimestamp(); + } + } + } + + if (listCount > 0) { + getLogger().info("Successfully created listing with {} new objects", new Object[] {listCount}); + session.commit(); + + // We have performed a listing and pushed the FlowFiles out. + // Now, we need to persist state about the Last Modified timestamp of the newest file + // that we pulled in. We do this in order to avoid pulling in the same file twice. + // However, we want to save the state both locally and remotely. + // We store the state remotely so that if a new Primary Node is chosen, it can pick up where the + // previously Primary Node left off. + // We also store the state locally so that if the node is restarted, and the node cannot contact + // the distributed state cache, the node can continue to run (if it is primary node). + String serializedState = null; + try { + serializedState = serializeState(entityList); + } catch (final Exception e) { + getLogger().error("Failed to serialize state due to {}", new Object[] {e}); + } + + if (serializedState != null) { + // Save our state locally. + try { + persistLocalState(path, serializedState); + } catch (final IOException ioe) { + getLogger().warn("Unable to save state locally. If the node is restarted now, data may be duplicated. Failure is due to {}", ioe); + } + + // Attempt to save state to remote server. + if (client != null) { + try { + client.put(getKey(path), serializedState, new StringSerDe(), new StringSerDe()); + } catch (final IOException ioe) { + getLogger().warn("Unable to communicate with distributed cache server due to {}. Persisting state locally instead.", ioe); + } + } + } + + lastListingTime = latestListingTimestamp; + } else { + getLogger().debug("There is no data to list. Yielding."); + context.yield(); + + // lastListingTime = 0 so that we don't continually poll the distributed cache / local file system + if (lastListingTime == null) { + lastListingTime = 0L; + } + + return; + } + } + + + /** + * Creates a Map of attributes that should be applied to the FlowFile to represent this entity. This processor will emit a FlowFile for each "new" entity + * (see the documentation for this class for a discussion of how this class determines whether or not an entity is "new"). The FlowFile will contain no + * content. The attributes that will be included are exactly the attributes that are returned by this method. + * + * @param entity the entity represented by the FlowFile + * @param context the ProcessContext for obtaining configuration information + * @return a Map of attributes for this entity + */ + protected abstract Map<String, String> createAttributes(T entity, ProcessContext context); + + /** + * Returns the path to perform a listing on. + * Many resources can be comprised of a "path" (or a "container" or "bucket", etc.) as well as name or identifier that is unique only + * within that path. This method is responsible for returning the path that is currently being polled for entities. If this does concept + * does not apply for the concrete implementation, it is recommended that the concrete implementation return "." or "/" for all invocations of this method. + * + * @param context the ProcessContex to use in order to obtain configuration + * @return the path that is to be used to perform the listing, or <code>null</code> if not applicable. + */ + protected abstract String getPath(final ProcessContext context); + + /** + * Performs a listing of the remote entities that can be pulled. If any entity that is returned has already been "discovered" or "emitted" + * by this Processor, it will be ignored. A discussion of how the Processor determines those entities that have already been emitted is + * provided above in the documentation for this class. Any entity that is returned by this method with a timestamp prior to the minTimestamp + * will be filtered out by the Processor. Therefore, it is not necessary that implementations perform this filtering but can be more efficient + * if the filtering can be performed on the server side prior to retrieving the information. + * + * @param context the ProcessContex to use in order to pull the appropriate entities + * @param minTimestamp the minimum timestamp of entities that should be returned. + * + * @return a Listing of entities that have a timestamp >= minTimestamp + */ + protected abstract List<T> performListing(final ProcessContext context, final Long minTimestamp) throws IOException; + + /** + * Determines whether or not the listing must be reset if the value of the given property is changed + * + * @param property the property that has changed + * @return <code>true</code> if a change in value of the given property necessitates that the listing be reset, <code>false</code> otherwise. + */ + protected abstract boolean isListingResetNecessary(final PropertyDescriptor property); + + + + private static class StringSerDe implements Serializer<String>, Deserializer<String> { + @Override + public String deserialize(final byte[] value) throws DeserializationException, IOException { + if (value == null) { + return null; + } + + return new String(value, StandardCharsets.UTF_8); + } + + @Override + public void serialize(final String value, final OutputStream out) throws SerializationException, IOException { + out.write(value.getBytes(StandardCharsets.UTF_8)); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java new file mode 100644 index 0000000..5eecac3 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.util.FileTransfer; +import org.apache.nifi.processors.standard.util.PermissionDeniedException; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StopWatch; +import org.apache.nifi.util.Tuple; + +/** + * A base class for FetchSFTP, FetchFTP processors + */ +public abstract class FetchFileTransfer extends AbstractProcessor { + static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() + .name("Hostname") + .description("The fully-qualified hostname or IP address of the host to fetch the data from") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .required(true) + .build(); + static final PropertyDescriptor UNDEFAULTED_PORT = new PropertyDescriptor.Builder() + .name("Port") + .description("The port to connect to on the remote host to fetch the data from") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .required(true) + .build(); + public static final PropertyDescriptor REMOTE_FILENAME = new PropertyDescriptor.Builder() + .name("Remote File") + .description("The fully qualified filename on the remote system") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + public static final PropertyDescriptor DELETE_ORIGINAL = new PropertyDescriptor.Builder() + .name("Delete Original") + .description("Determines whether or not the file is deleted from the remote system after it has been successfully transferred") + .defaultValue("true") + .allowableValues("true", "false") + .required(true) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All FlowFiles that are received are routed to success") + .build(); + static final Relationship REL_COMMS_FAILURE = new Relationship.Builder() + .name("comms.failure") + .description("Any FlowFile that could not be fetched from the remote server due to a communications failure will be transferred to this Relationship.") + .build(); + static final Relationship REL_NOT_FOUND = new Relationship.Builder() + .name("not.found") + .description("Any FlowFile for which we receive a 'Not Found' message from the remote server will be transferred to this Relationship.") + .build(); + static final Relationship REL_PERMISSION_DENIED = new Relationship.Builder() + .name("permission.denied") + .description("Any FlowFile that could not be fetched from the remote server due to insufficient permissions will be transferred to this Relationship.") + .build(); + + private final Map<Tuple<String, Integer>, BlockingQueue<FileTransferIdleWrapper>> fileTransferMap = new HashMap<>(); + private final long IDLE_CONNECTION_MILLIS = TimeUnit.SECONDS.toMillis(10L); // amount of time to wait before closing an idle connection + private volatile long lastClearTime = System.currentTimeMillis(); + + @Override + public Set<Relationship> getRelationships() { + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_NOT_FOUND); + relationships.add(REL_PERMISSION_DENIED); + relationships.add(REL_COMMS_FAILURE); + return relationships; + } + + /** + * Close connections that are idle or optionally close all connections. + * Connections are considered "idle" if they have not been used in 10 seconds. + * + * @param closeNonIdleConnections if <code>true</code> will close all connection; if <code>false</code> will close only idle connections + */ + private void closeConnections(final boolean closeNonIdleConnections) { + for (final Map.Entry<Tuple<String, Integer>, BlockingQueue<FileTransferIdleWrapper>> entry : fileTransferMap.entrySet()) { + final BlockingQueue<FileTransferIdleWrapper> wrapperQueue = entry.getValue(); + + final List<FileTransferIdleWrapper> putBack = new ArrayList<>(); + FileTransferIdleWrapper wrapper; + while ((wrapper = wrapperQueue.poll()) != null) { + final long lastUsed = wrapper.getLastUsed(); + final long nanosSinceLastUse = System.nanoTime() - lastUsed; + if (!closeNonIdleConnections && TimeUnit.NANOSECONDS.toMillis(nanosSinceLastUse) < IDLE_CONNECTION_MILLIS) { + putBack.add(wrapper); + } else { + try { + wrapper.getFileTransfer().close(); + } catch (final IOException ioe) { + getLogger().warn("Failed to close Idle Connection due to {}", new Object[] {ioe}, ioe); + } + } + } + + for (final FileTransferIdleWrapper toPutBack : putBack) { + wrapperQueue.offer(toPutBack); + } + } + } + + @OnStopped + public void cleanup() { + // close all connections + closeConnections(true); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(HOSTNAME); + properties.add(UNDEFAULTED_PORT); + properties.add(REMOTE_FILENAME); + properties.add(DELETE_ORIGINAL); + return properties; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final StopWatch stopWatch = new StopWatch(true); + final String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue(); + final int port = context.getProperty(UNDEFAULTED_PORT).evaluateAttributeExpressions(flowFile).asInteger(); + final String filename = context.getProperty(REMOTE_FILENAME).evaluateAttributeExpressions(flowFile).getValue(); + + // Try to get a FileTransfer object from our cache. + BlockingQueue<FileTransferIdleWrapper> transferQueue; + synchronized (fileTransferMap) { + final Tuple<String, Integer> tuple = new Tuple<>(host, port); + + transferQueue = fileTransferMap.get(tuple); + if (transferQueue == null) { + transferQueue = new LinkedBlockingQueue<>(); + fileTransferMap.put(tuple, transferQueue); + } + + // periodically close idle connections + if (System.currentTimeMillis() - lastClearTime > IDLE_CONNECTION_MILLIS) { + closeConnections(false); + lastClearTime = System.currentTimeMillis(); + } + } + + // we have a queue of FileTransfer Objects. Get one from the queue or create a new one. + FileTransfer transfer; + FileTransferIdleWrapper transferWrapper = transferQueue.poll(); + if (transferWrapper == null) { + transfer = createFileTransfer(context); + } else { + transfer = transferWrapper.getFileTransfer(); + } + + // Pull data from remote system. + final InputStream in; + try { + in = transfer.getInputStream(filename, flowFile); + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + StreamUtils.copy(in, out); + } + }); + transfer.flush(); + transferQueue.offer(new FileTransferIdleWrapper(transfer, System.nanoTime())); + } catch (final FileNotFoundException e) { + getLogger().error("Failed to fetch content for {} from filename {} on remote host {} because the file could not be found on the remote system; routing to {}", + new Object[] {flowFile, filename, host, REL_NOT_FOUND.getName()}); + session.transfer(session.penalize(flowFile), REL_NOT_FOUND); + session.getProvenanceReporter().route(flowFile, REL_NOT_FOUND); + return; + } catch (final PermissionDeniedException e) { + getLogger().error("Failed to fetch content for {} from filename {} on remote host {} due to insufficient permissions; routing to {}", + new Object[] {flowFile, filename, host, REL_PERMISSION_DENIED.getName()}); + session.transfer(session.penalize(flowFile), REL_PERMISSION_DENIED); + session.getProvenanceReporter().route(flowFile, REL_PERMISSION_DENIED); + return; + } catch (final IOException e) { + try { + transfer.close(); + } catch (final IOException e1) { + getLogger().warn("Failed to close connection to {}:{} due to {}", new Object[] {host, port, e.toString()}, e); + } + + getLogger().error("Failed to fetch content for {} from filename {} on remote host {}:{} due to {}; routing to failure", + new Object[] {flowFile, filename, host, port, e.toString()}, e); + session.transfer(session.penalize(flowFile), REL_COMMS_FAILURE); + return; + } + + // Add FlowFile attributes + final String protocolName = transfer.getProtocolName(); + final Map<String, String> attributes = new HashMap<>(); + attributes.put(protocolName + ".remote.host", host); + attributes.put(protocolName + ".remote.port", String.valueOf(port)); + attributes.put(protocolName + ".remote.filename", filename); + attributes.put(CoreAttributes.FILENAME.key(), filename); + flowFile = session.putAllAttributes(flowFile, attributes); + + // emit provenance event and transfer FlowFile + session.getProvenanceReporter().modifyContent(flowFile, "Content replaced with content from " + protocolName + "://" + host + ":" + port + "/" + filename, + stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.transfer(flowFile, REL_SUCCESS); + + // delete remote file is necessary + final boolean deleteOriginal = context.getProperty(DELETE_ORIGINAL).asBoolean(); + if (deleteOriginal) { + try { + transfer.deleteFile(null, filename); + } catch (final FileNotFoundException e) { + // file doesn't exist -- effectively the same as removing it. Move on. + } catch (final IOException ioe) { + getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to remove the remote file due to {}", new Object[] {flowFile, host, port, filename, ioe}, ioe); + } + } + } + + + /** + * Creates a new instance of a FileTransfer that can be used to pull files from a remote system. + * + * @param context the ProcessContext to use in order to obtain configured properties + * @return a FileTransfer that can be used to pull files from a remote system + */ + protected abstract FileTransfer createFileTransfer(ProcessContext context); + + /** + * Wrapper around a FileTransfer object that is used to know when the FileTransfer was last used, so that + * we have the ability to close connections that are "idle," or unused for some period of time. + */ + private static class FileTransferIdleWrapper { + private final FileTransfer fileTransfer; + private final long lastUsed; + + public FileTransferIdleWrapper(final FileTransfer fileTransfer, final long lastUsed) { + this.fileTransfer = fileTransfer; + this.lastUsed = lastUsed; + } + + public FileTransfer getFileTransfer() { + return fileTransfer; + } + + public long getLastUsed() { + return this.lastUsed; + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java new file mode 100644 index 0000000..6387e19 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.standard.util.FileTransfer; +import org.apache.nifi.processors.standard.util.SFTPTransfer; + + +@SupportsBatching +@Tags({"sftp", "get", "retrieve", "files", "fetch", "remote", "ingest", "source", "input"}) +@CapabilityDescription("Fetches the content of a file from a remote SFTP server and overwrites the contents of an incoming FlowFile with the content of the remote file.") +@SeeAlso({GetSFTP.class, PutSFTP.class, GetFTP.class, PutFTP.class}) +@WritesAttributes({ + @WritesAttribute(attribute = "sftp.remote.host", description = "The hostname or IP address from which the file was pulled"), + @WritesAttribute(attribute = "sftp.remote.port", description = "The port that was used to communicate with the remote SFTP server"), + @WritesAttribute(attribute = "sftp.remote.filename", description = "The name of the remote file that was pulled"), + @WritesAttribute(attribute = "filename", description = "The filename is updated to point to the filename fo the remote file"), +}) +public class FetchSFTP extends FetchFileTransfer { + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(FetchFileTransfer.HOSTNAME); + properties.add(SFTPTransfer.PORT); + properties.add(SFTPTransfer.USERNAME); + properties.add(SFTPTransfer.PASSWORD); + properties.add(SFTPTransfer.PRIVATE_KEY_PATH); + properties.add(SFTPTransfer.PRIVATE_KEY_PASSPHRASE); + properties.add(FetchFileTransfer.REMOTE_FILENAME); + properties.add(SFTPTransfer.DELETE_ORIGINAL); + properties.add(SFTPTransfer.CONNECTION_TIMEOUT); + properties.add(SFTPTransfer.DATA_TIMEOUT); + properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT); + properties.add(SFTPTransfer.HOST_KEY_FILE); + properties.add(SFTPTransfer.STRICT_HOST_KEY_CHECKING); + properties.add(SFTPTransfer.USE_COMPRESSION); + return properties; + } + + @Override + protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { + if (!validationContext.getProperty(SFTPTransfer.PASSWORD).isSet() && !(validationContext.getProperty(SFTPTransfer.PRIVATE_KEY_PASSPHRASE).isSet() + && validationContext.getProperty(SFTPTransfer.PRIVATE_KEY_PATH).isSet())) { + return Collections.singleton(new ValidationResult.Builder() + .subject("Password") + .valid(false) + .explanation("Must set either password or Private Key Path & Passphrase") + .build()); + } + + return Collections.emptyList(); + } + + @Override + protected FileTransfer createFileTransfer(final ProcessContext context) { + return new SFTPTransfer(context, getLogger()); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java new file mode 100644 index 0000000..d6e1cd1 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.util.FileInfo; +import org.apache.nifi.processors.standard.util.FileTransfer; + +public abstract class ListFileTransfer extends AbstractListProcessor<FileInfo> { + public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() + .name("Hostname") + .description("The fully qualified hostname or IP address of the remote system") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .expressionLanguageSupported(true) + .build(); + public static final PropertyDescriptor REMOTE_PATH = new PropertyDescriptor.Builder() + .name("Remote Path") + .description("The path on the remote system from which to pull or push files") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .defaultValue(".") + .build(); + + + @Override + protected Map<String, String> createAttributes(final FileInfo fileInfo, final ProcessContext context) { + final Map<String, String> attributes = new HashMap<>(); + attributes.put(getProtocolName() + ".remote.host", context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue()); + attributes.put("file.owner", fileInfo.getOwner()); + attributes.put("file.group", fileInfo.getGroup()); + attributes.put("file.permissions", fileInfo.getPermissions()); + attributes.put(CoreAttributes.FILENAME.key(), fileInfo.getFileName()); + + final String fullPath = fileInfo.getFullPathFileName(); + if (fullPath != null) { + final int index = fullPath.lastIndexOf("/"); + if (index > -1) { + final String path = fullPath.substring(0, index); + attributes.put(CoreAttributes.PATH.key(), path); + } + } + return attributes; + } + + @Override + protected String getPath(final ProcessContext context) { + return context.getProperty(REMOTE_PATH).getValue(); + } + + @Override + protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException { + final FileTransfer transfer = getFileTransfer(context); + final List<FileInfo> listing = transfer.getListing(); + if (minTimestamp == null) { + return listing; + } + + final Iterator<FileInfo> itr = listing.iterator(); + while (itr.hasNext()) { + final FileInfo next = itr.next(); + if (next.getLastModifiedTime() < minTimestamp) { + itr.remove(); + } + } + + return listing; + } + + @Override + protected boolean isListingResetNecessary(final PropertyDescriptor property) { + return HOSTNAME.equals(property) || REMOTE_PATH.equals(property); + } + + protected abstract FileTransfer getFileTransfer(final ProcessContext context); + + protected abstract String getProtocolName(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java new file mode 100644 index 0000000..3b6b69e --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.standard.util.FileTransfer; +import org.apache.nifi.processors.standard.util.SFTPTransfer; + +@TriggerSerially +@Tags({"list", "sftp", "remote", "ingest", "source", "input", "files"}) +@CapabilityDescription("Performs a listing of the files residing on an SFTP server. For each file that is found on the remote server, a new FlowFile will be created with the filename attribute " + + "set to the name of the file on the remote server. This can then be used in conjunction with FetchSFTP in order to fetch those files.") +@SeeAlso({FetchSFTP.class, GetSFTP.class, PutSFTP.class}) +@WritesAttributes({ + @WritesAttribute(attribute = "sftp.remote.host", description = "The hostname of the SFTP Server"), + @WritesAttribute(attribute = "file.owner", description = "The numeric owner id of the source file"), + @WritesAttribute(attribute = "file.group", description = "The numeric group id of the source file"), + @WritesAttribute(attribute = "file.permissions", description = "The read/write/execute permissions of the source file"), + @WritesAttribute(attribute = "filename", description = "The name of the file on the SFTP Server"), + @WritesAttribute(attribute = "path", description = "The fully qualified name of the directory on the SFTP Server from which the file was pulled"), +}) +public class ListSFTP extends ListFileTransfer { + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(SFTPTransfer.HOSTNAME); + properties.add(SFTPTransfer.PORT); + properties.add(SFTPTransfer.USERNAME); + properties.add(SFTPTransfer.PASSWORD); + properties.add(SFTPTransfer.PRIVATE_KEY_PATH); + properties.add(SFTPTransfer.PRIVATE_KEY_PASSPHRASE); + properties.add(REMOTE_PATH); + properties.add(DISTRIBUTED_CACHE_SERVICE); + properties.add(SFTPTransfer.RECURSIVE_SEARCH); + properties.add(SFTPTransfer.FILE_FILTER_REGEX); + properties.add(SFTPTransfer.PATH_FILTER_REGEX); + properties.add(SFTPTransfer.IGNORE_DOTTED_FILES); + properties.add(SFTPTransfer.STRICT_HOST_KEY_CHECKING); + properties.add(SFTPTransfer.HOST_KEY_FILE); + properties.add(SFTPTransfer.CONNECTION_TIMEOUT); + properties.add(SFTPTransfer.DATA_TIMEOUT); + properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT); + return properties; + } + + @Override + protected FileTransfer getFileTransfer(final ProcessContext context) { + return new SFTPTransfer(context, getLogger()); + } + + @Override + protected String getProtocolName() { + return "sftp"; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/EntityListing.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/EntityListing.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/EntityListing.java new file mode 100644 index 0000000..56489f0 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/EntityListing.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard.util; + +import java.util.Collection; +import java.util.Date; + +import javax.xml.bind.annotation.XmlTransient; +import javax.xml.bind.annotation.XmlType; + +/** + * A simple POJO for maintaining state about the last entities listed by an AbstractListProcessor that was performed so that + * we can avoid pulling the same file multiple times + */ +@XmlType(name = "listing") +public class EntityListing { + + private Date latestTimestamp; + private Collection<String> matchingIdentifiers; + + /** + * @return the modification date of the newest file that was contained in the listing + */ + public Date getLatestTimestamp() { + return latestTimestamp; + } + + /** + * Sets the timestamp of the modification date of the newest file that was contained in the listing + * + * @param latestTimestamp the timestamp of the modification date of the newest file that was contained in the listing + */ + public void setLatestTimestamp(Date latestTimestamp) { + this.latestTimestamp = latestTimestamp; + } + + /** + * @return a Collection containing the identifiers of all entities in the listing whose timestamp + * was equal to {@link #getLatestTimestamp()} + */ + @XmlTransient + public Collection<String> getMatchingIdentifiers() { + return matchingIdentifiers; + } + + /** + * Sets the Collection containing the identifiers of all entities in the listing whose Timestamp was + * equal to {@link #getLatestTimestamp()} + * + * @param matchingIdentifiers the identifiers that have last modified date matching the latest timestamp + */ + public void setMatchingIdentifiers(Collection<String> matchingIdentifiers) { + this.matchingIdentifiers = matchingIdentifiers; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java index 41a42bb..7f659d4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java @@ -34,16 +34,16 @@ import java.util.Locale; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; +import org.apache.commons.net.ftp.FTPClient; +import org.apache.commons.net.ftp.FTPFile; +import org.apache.commons.net.ftp.FTPHTTPClient; +import org.apache.commons.net.ftp.FTPReply; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.commons.net.ftp.FTPClient; -import org.apache.commons.net.ftp.FTPFile; -import org.apache.commons.net.ftp.FTPHTTPClient; -import org.apache.commons.net.ftp.FTPReply; public class FTPTransfer implements FileTransfer { @@ -57,53 +57,53 @@ public class FTPTransfer implements FileTransfer { public static final String PROXY_TYPE_SOCKS = Proxy.Type.SOCKS.name(); public static final PropertyDescriptor CONNECTION_MODE = new PropertyDescriptor.Builder() - .name("Connection Mode") - .description("The FTP Connection Mode") - .allowableValues(CONNECTION_MODE_ACTIVE, CONNECTION_MODE_PASSIVE) - .defaultValue(CONNECTION_MODE_PASSIVE) - .build(); + .name("Connection Mode") + .description("The FTP Connection Mode") + .allowableValues(CONNECTION_MODE_ACTIVE, CONNECTION_MODE_PASSIVE) + .defaultValue(CONNECTION_MODE_PASSIVE) + .build(); public static final PropertyDescriptor TRANSFER_MODE = new PropertyDescriptor.Builder() - .name("Transfer Mode") - .description("The FTP Transfer Mode") - .allowableValues(TRANSFER_MODE_BINARY, TRANSFER_MODE_ASCII) - .defaultValue(TRANSFER_MODE_BINARY) - .build(); + .name("Transfer Mode") + .description("The FTP Transfer Mode") + .allowableValues(TRANSFER_MODE_BINARY, TRANSFER_MODE_ASCII) + .defaultValue(TRANSFER_MODE_BINARY) + .build(); public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() - .name("Port") - .description("The port that the remote system is listening on for file transfers") - .addValidator(StandardValidators.PORT_VALIDATOR) - .required(true) - .defaultValue("21") - .build(); + .name("Port") + .description("The port that the remote system is listening on for file transfers") + .addValidator(StandardValidators.PORT_VALIDATOR) + .required(true) + .defaultValue("21") + .build(); public static final PropertyDescriptor PROXY_TYPE = new PropertyDescriptor.Builder() - .name("Proxy Type") - .description("Proxy type used for file transfers") - .allowableValues(PROXY_TYPE_DIRECT, PROXY_TYPE_HTTP, PROXY_TYPE_SOCKS) - .defaultValue(PROXY_TYPE_DIRECT) - .build(); + .name("Proxy Type") + .description("Proxy type used for file transfers") + .allowableValues(PROXY_TYPE_DIRECT, PROXY_TYPE_HTTP, PROXY_TYPE_SOCKS) + .defaultValue(PROXY_TYPE_DIRECT) + .build(); public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder() - .name("Proxy Host") - .description("The fully qualified hostname or IP address of the proxy server") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); + .name("Proxy Host") + .description("The fully qualified hostname or IP address of the proxy server") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); public static final PropertyDescriptor PROXY_PORT = new PropertyDescriptor.Builder() - .name("Proxy Port") - .description("The port of the proxy server") - .addValidator(StandardValidators.PORT_VALIDATOR) - .build(); + .name("Proxy Port") + .description("The port of the proxy server") + .addValidator(StandardValidators.PORT_VALIDATOR) + .build(); public static final PropertyDescriptor HTTP_PROXY_USERNAME = new PropertyDescriptor.Builder() - .name("Http Proxy Username") - .description("Http Proxy Username") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .required(false) - .build(); + .name("Http Proxy Username") + .description("Http Proxy Username") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .build(); public static final PropertyDescriptor HTTP_PROXY_PASSWORD = new PropertyDescriptor.Builder() - .name("Http Proxy Password") - .description("Http Proxy Password") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .required(false) - .sensitive(true) - .build(); + .name("Http Proxy Password") + .description("Http Proxy Password") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .sensitive(true) + .build(); private final ProcessorLog logger; @@ -135,7 +135,7 @@ public class FTPTransfer implements FileTransfer { client.disconnect(); } } catch (final Exception ex) { - logger.warn("Failed to close FTPClient due to {}", new Object[]{ex.toString()}, ex); + logger.warn("Failed to close FTPClient due to {}", new Object[] { ex.toString() }, ex); } client = null; } @@ -261,19 +261,24 @@ public class FTPTransfer implements FileTransfer { perms.append(file.hasPermission(FTPFile.WORLD_ACCESS, FTPFile.EXECUTE_PERMISSION) ? "x" : "-"); FileInfo.Builder builder = new FileInfo.Builder() - .filename(file.getName()) - .fullPathFileName(newFullForwardPath) - .directory(file.isDirectory()) - .size(file.getSize()) - .lastModifiedTime(file.getTimestamp().getTimeInMillis()) - .permissions(perms.toString()) - .owner(file.getUser()) - .group(file.getGroup()); + .filename(file.getName()) + .fullPathFileName(newFullForwardPath) + .directory(file.isDirectory()) + .size(file.getSize()) + .lastModifiedTime(file.getTimestamp().getTimeInMillis()) + .permissions(perms.toString()) + .owner(file.getUser()) + .group(file.getGroup()); return builder.build(); } @Override - public InputStream getInputStream(final String remoteFileName) throws IOException { + public InputStream getInputStream(String remoteFileName) throws IOException { + return getInputStream(remoteFileName, null); + } + + @Override + public InputStream getInputStream(final String remoteFileName, final FlowFile flowFile) throws IOException { final FTPClient client = getClient(null); InputStream in = client.retrieveFileStream(remoteFileName); if (in == null) { @@ -329,9 +334,9 @@ public class FTPTransfer implements FileTransfer { final boolean cdSuccessful = setWorkingDirectory(remoteDirectory); if (!cdSuccessful) { - logger.debug("Remote Directory {} does not exist; creating it", new Object[]{remoteDirectory}); + logger.debug("Remote Directory {} does not exist; creating it", new Object[] { remoteDirectory }); if (client.makeDirectory(remoteDirectory)) { - logger.debug("Created {}", new Object[]{remoteDirectory}); + logger.debug("Created {}", new Object[] { remoteDirectory }); } else { throw new IOException("Failed to create remote directory " + remoteDirectory); } @@ -387,10 +392,10 @@ public class FTPTransfer implements FileTransfer { final String time = outformat.format(fileModifyTime); if (!client.setModificationTime(tempFilename, time)) { // FTP server probably doesn't support MFMT command - logger.warn("Could not set lastModifiedTime on {} to {}", new Object[]{flowFile, lastModifiedTime}); + logger.warn("Could not set lastModifiedTime on {} to {}", new Object[] { flowFile, lastModifiedTime }); } } catch (final Exception e) { - logger.error("Failed to set lastModifiedTime on {} to {} due to {}", new Object[]{flowFile, lastModifiedTime, e}); + logger.error("Failed to set lastModifiedTime on {} to {} due to {}", new Object[] { flowFile, lastModifiedTime, e }); } } final String permissions = ctx.getProperty(PERMISSIONS).evaluateAttributeExpressions(flowFile).getValue(); @@ -399,17 +404,17 @@ public class FTPTransfer implements FileTransfer { int perms = numberPermissions(permissions); if (perms >= 0) { if (!client.sendSiteCommand("chmod " + Integer.toOctalString(perms) + " " + tempFilename)) { - logger.warn("Could not set permission on {} to {}", new Object[]{flowFile, permissions}); + logger.warn("Could not set permission on {} to {}", new Object[] { flowFile, permissions }); } } } catch (final Exception e) { - logger.error("Failed to set permission on {} to {} due to {}", new Object[]{flowFile, permissions, e}); + logger.error("Failed to set permission on {} to {} due to {}", new Object[] { flowFile, permissions, e }); } } if (!filename.equals(tempFilename)) { try { - logger.debug("Renaming remote path from {} to {} for {}", new Object[]{tempFilename, filename, flowFile}); + logger.debug("Renaming remote path from {} to {} for {}", new Object[] { tempFilename, filename, flowFile }); final boolean renameSuccessful = client.rename(tempFilename, filename); if (!renameSuccessful) { throw new IOException("Failed to rename temporary file " + tempFilename + " to " + fullPath + " due to: " + client.getReplyString()); @@ -513,13 +518,13 @@ public class FTPTransfer implements FileTransfer { inetAddress = InetAddress.getByName(remoteHostname); } - client.connect(inetAddress, ctx.getProperty(PORT).asInteger()); + client.connect(inetAddress, ctx.getProperty(PORT).evaluateAttributeExpressions(flowFile).asInteger()); this.closed = false; client.setDataTimeout(ctx.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); client.setSoTimeout(ctx.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); - final String username = ctx.getProperty(USERNAME).getValue(); - final String password = ctx.getProperty(PASSWORD).getValue(); + final String username = ctx.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue(); + final String password = ctx.getProperty(PASSWORD).evaluateAttributeExpressions(flowFile).getValue(); final boolean loggedIn = client.login(username, password); if (!loggedIn) { throw new IOException("Could not login for user '" + username + "'"); @@ -532,7 +537,7 @@ public class FTPTransfer implements FileTransfer { client.enterLocalPassiveMode(); } - final String transferMode = ctx.getProperty(TRANSFER_MODE).getValue(); + final String transferMode = ctx.getProperty(TRANSFER_MODE).evaluateAttributeExpressions(flowFile).getValue(); final int fileType = (transferMode.equalsIgnoreCase(TRANSFER_MODE_ASCII)) ? FTPClient.ASCII_FILE_TYPE : FTPClient.BINARY_FILE_TYPE; if (!client.setFileType(fileType)) { throw new IOException("Unable to set transfer mode to type " + transferMode); http://git-wip-us.apache.org/repos/asf/nifi/blob/d1d57931/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileInfo.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileInfo.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileInfo.java index c57b4e0..b893f75 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileInfo.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileInfo.java @@ -18,7 +18,7 @@ package org.apache.nifi.processors.standard.util; import java.io.Serializable; -public class FileInfo implements Comparable<FileInfo>, Serializable { +public class FileInfo implements Comparable<FileInfo>, Serializable, ListableEntity { private static final long serialVersionUID = 1L; @@ -164,4 +164,20 @@ public class FileInfo implements Comparable<FileInfo>, Serializable { return this; } } + + @Override + public String getName() { + return getFileName(); + } + + @Override + public String getIdentifier() { + final String fullPathName = getFullPathFileName(); + return fullPathName == null ? getName() : fullPathName; + } + + @Override + public long getTimestamp() { + return getLastModifiedTime(); + } }
