Repository: nifi Updated Branches: refs/heads/master d01449ee7 -> 1a512cd1e
NIFI-1484 Making use of timestamps at various points of execution to provide listing of all but the latest files which are held until a subsequent execution. Correcting nifi-amqp-nar bundle's pom description. This closes #212. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1a512cd1 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1a512cd1 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1a512cd1 Branch: refs/heads/master Commit: 1a512cd1e67e6d0231e9dcde9d32472fad4c5bd2 Parents: d01449e Author: Aldrin Piri <ald...@apache.org> Authored: Tue Feb 9 15:03:58 2016 -0500 Committer: Aldrin Piri <ald...@apache.org> Committed: Wed Feb 10 10:25:47 2016 -0500 ---------------------------------------------------------------------- nifi-nar-bundles/nifi-amqp-bundle/pom.xml | 2 +- .../standard/AbstractListProcessor.java | 312 ++++++++++--------- .../nifi/processors/standard/ListFile.java | 4 +- .../nifi/processors/standard/ListSFTP.java | 4 +- .../standard/TestAbstractListProcessor.java | 219 ++++++++++--- .../nifi/processors/standard/TestListFile.java | 164 ++++++++-- 6 files changed, 498 insertions(+), 207 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/1a512cd1/nifi-nar-bundles/nifi-amqp-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-amqp-bundle/pom.xml b/nifi-nar-bundles/nifi-amqp-bundle/pom.xml index 1ab9109..fff9905 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-amqp-bundle/pom.xml @@ -23,7 +23,7 @@ <artifactId>nifi-amqp-bundle</artifactId> <version>0.5.0-SNAPSHOT</version> <packaging>pom</packaging> - <description>A bundle of processors that run Flume sources/sinks</description> + <description>A bundle of processors that publish to and consume messages from AMQP.</description> <modules> <module>nifi-amqp-processors</module> <module>nifi-amqp-nar</module> http://git-wip-us.apache.org/repos/asf/nifi/blob/1a512cd1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java index b04deb3..08b8b28 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java @@ -23,13 +23,13 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.TreeMap; import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.TriggerSerially; @@ -64,59 +64,44 @@ import org.codehaus.jackson.map.ObjectMapper; * Those remote resources may be files, "objects", "messages", or any other sort of entity that may need to be listed in such a way that * we identity the entity only once. Each of these objects, messages, etc. is referred to as an "entity" for the scope of this Processor. * </p> - * * <p> * This class is responsible for triggering the listing to occur, filtering the results returned such that only new (unlisted) entities * or entities that have been modified will be emitted from the Processor. * </p> - * * <p> * In order to make use of this abstract class, the entities listed must meet the following criteria: * </p> - * * <ul> - * <li> - * Entity must have a timestamp associated with it. This timestamp is used to determine if entities are "new" or not. Any entity that is - * returned by the listing will be considered "new" if the timestamp is later than the latest timestamp pulled. - * </li> - * <li> - * Entity must have a unique identifier. This is used in conjunction with the timestamp in order to determine whether or not the entity is - * new. If the timestamp of an entity is before the latest timestamp pulled, then the entity is not considered new. If the timestamp is later - * than the last timestamp pulled, then the entity is considered new. If the timestamp is equal to the latest timestamp pulled, then the entity's - * identifier is compared to all of the entity identifiers that have that same timestamp in order to determine whether or not the entity has been - * seen already. - * </li> - * <li> - * Entity must have a user-readable name that can be used for logging purposes. - * </li> + * <li> + * Entity must have a timestamp associated with it. This timestamp is used to determine if entities are "new" or not. Any entity that is + * returned by the listing will be considered "new" if the timestamp is later than the latest timestamp pulled. + * </li> + * <li> + * If the timestamp of an entity is before OR equal to the latest timestamp pulled, then the entity is not considered new. If the timestamp is later + * than the last timestamp pulled, then the entity is considered new. + * </li> + * <li> + * Entity must have a user-readable name that can be used for logging purposes. + * </li> * </ul> - * * <p> - * This class persists state across restarts so that even if NiFi is restarted, duplicates will not be pulled from the remote system. This is performed using - * two different mechanisms. First, state is stored locally. This allows the system to be restarted and begin processing where it left off. The state that is - * stored is the latest timestamp that has been pulled (as determined by the timestamps of the entities that are returned), as well as the unique identifier of - * each entity that has that timestamp. See the section above for information about how these pieces of information are used in order to determine entity uniqueness. + * This class persists state across restarts so that even if NiFi is restarted, duplicates will not be pulled from the target system given the above criteria. This is + * performed using the {@link StateManager}. This allows the system to be restarted and begin processing where it left off. The state that is stored is the latest timestamp + * that has been pulled (as determined by the timestamps of the entities that are returned). See the section above for information about how this information isused in order to + * determine new entities. * </p> - * * <p> - * In addition to storing state locally, the Processor exposes an optional <code>Distributed Cache Service</code> property. In standalone deployment of NiFi, this is - * not necessary. However, in a clustered environment, subclasses of this class are expected to be run only on primary node. While this means that the local state is - * accurate as long as the primary node remains constant, the primary node in the cluster can be changed. As a result, if running in a clustered environment, it is - * recommended that this property be set. This allows the same state that is described above to also be replicated across the cluster. If this property is set, then - * on restart the Processor will not begin listing until it has retrieved an updated state from this service, as it does not know whether or not another node has - * modified the state in the mean time. + * NOTE: This processor performs migrations of legacy state mechanisms inclusive of locally stored, file-based state and the optional utilization of the <code>Distributed Cache + * Service</code> property to the new {@link StateManager} functionality. Upon successful migration, the associated data from one or both of the legacy mechanisms is purged. * </p> - * * <p> * For each new entity that is listed, the Processor will send a FlowFile to the 'success' relationship. The FlowFile will have no content but will have some set * of attributes (defined by the concrete implementation) that can be used to fetch those remote resources or interact with them in whatever way makes sense for * the configured dataflow. * </p> - * * <p> * Subclasses are responsible for the following: * </p> - * * <ul> * <li> * Perform a listing of remote resources. The subclass will implement the {@link #performListing(ProcessContext, Long)} method, which creates a listing of all @@ -141,9 +126,10 @@ import org.codehaus.jackson.map.ObjectMapper; * </ul> */ @TriggerSerially -@Stateful(scopes={Scope.LOCAL, Scope.CLUSTER}, description="After a listing of resources is performed, the latest timestamp of any of the resources is stored in the component's state, " - + "along with all resources that have that same timestmap so that the Processor can avoid data duplication. The scope used depends on the implementation.") +@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "After a listing of resources is performed, the latest timestamp of any of the resources is stored in the component's state. " + + "The scope used depends on the implementation.") public abstract class AbstractListProcessor<T extends ListableEntity> extends AbstractProcessor { + public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder() .name("Distributed Cache Service") .description("Specifies the Controller Service that should be used to maintain state about what has been pulled from the remote server so that if a new node " @@ -153,21 +139,25 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab .identifiesControllerService(DistributedMapCacheClient.class) .build(); - - public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("All FlowFiles that are received are routed to success") .build(); - - private volatile Set<String> latestIdentifiersListed = new HashSet<>(); private volatile Long lastListingTime = null; + private volatile Long lastProcessedTime = 0L; + private volatile Long lastRunTime = 0L; private volatile boolean justElectedPrimaryNode = false; - private volatile boolean resetListing = false; + private volatile boolean resetState = false; - static final String TIMESTAMP = "timestamp"; - static final String IDENTIFIER_PREFIX = "id"; + /* + * A constant used in determining an internal "yield" of processing files. Given the logic to provide a pause on the newest + * files according to timestamp, it is ensured that at least the specified millis has been eclipsed to avoid getting scheduled + * near instantaneously after the prior iteration effectively voiding the built in buffer + */ + static final long LISTING_LAG_MILLIS = 100L; + static final String LISTING_TIMESTAMP_KEY = "listing.timestamp"; + static final String PROCESSED_TIMESTAMP_KEY = "processed.timestamp"; protected File getPersistenceFile() { return new File("conf/state/" + getIdentifier()); @@ -183,12 +173,12 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab @Override public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { if (isConfigurationRestored() && isListingResetNecessary(descriptor)) { - lastListingTime = null; // clear lastListingTime so that we have to fetch new time - latestIdentifiersListed = new HashSet<>(); - resetListing = true; + resetTimeStates(); // clear lastListingTime so that we have to fetch new time + resetState = true; } } + @Override public Set<Relationship> getRelationships() { final Set<Relationship> relationships = new HashSet<>(); @@ -218,49 +208,53 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab } } - // remove entry from Distributed cache server - if (client != null) { - try { - client.remove(path, new StringSerDe()); - } catch (final IOException ioe) { - getLogger().warn("Failed to remove entry from Distributed Cache Service. However, the state has already been migrated to use the new " - + "State Management service, so the Distributed Cache Service is no longer needed."); - } + // When scheduled to run, check if the associated timestamp is null, signifying a clearing of state and reset the internal timestamp + if (lastListingTime != null && stateMap.get(LISTING_TIMESTAMP_KEY) == null) { + getLogger().info("Detected that state was cleared for this component. Resetting internal values."); + resetTimeStates(); } - if (resetListing) { + if (resetState) { context.getStateManager().clear(getStateScope(context)); - resetListing = false; + resetState = false; } } /** * This processor used to use the DistributedMapCacheClient in order to store cluster-wide state, before the introduction of * the StateManager. This method will migrate state from that DistributedMapCacheClient, or from a local file, to the StateManager, - * if any state already exists + * if any state already exists. More specifically, this will extract out the relevant timestamp for when the processor last ran * - * @param path the path to migrate state for - * @param client the DistributedMapCacheClient that is capable of obtaining the current state + * @param path the path to migrate state for + * @param client the DistributedMapCacheClient that is capable of obtaining the current state * @param stateManager the StateManager to use in order to store the new state - * @param scope the scope to use + * @param scope the scope to use * @throws IOException if unable to retrieve or store the state */ private void migrateState(final String path, final DistributedMapCacheClient client, final StateManager stateManager, final Scope scope) throws IOException { Long minTimestamp = null; - final Set<String> latestIdentifiersListed = new HashSet<>(); - // Retrieve state from Distributed Cache Client + // Retrieve state from Distributed Cache Client, establishing the latest file seen if (client != null) { final StringSerDe serde = new StringSerDe(); final String serializedState = client.get(getKey(path), serde, serde); if (serializedState != null && !serializedState.isEmpty()) { final EntityListing listing = deserialize(serializedState); minTimestamp = listing.getLatestTimestamp().getTime(); - latestIdentifiersListed.addAll(listing.getMatchingIdentifiers()); + } + + // remove entry from distributed cache server + if (client != null) { + try { + client.remove(path, new StringSerDe()); + } catch (final IOException ioe) { + getLogger().warn("Failed to remove entry from Distributed Cache Service. However, the state has already been migrated to use the new " + + "State Management service, so the Distributed Cache Service is no longer needed."); + } } } - // Retrieve state from locally persisted file + // Retrieve state from locally persisted file, and compare these to the minTimestamp established from the distributedCache, if there was one final File persistenceFile = getPersistenceFile(); if (persistenceFile.exists()) { final Properties props = new Properties(); @@ -273,10 +267,9 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab if (locallyPersistedValue != null) { final EntityListing listing = deserialize(locallyPersistedValue); final long localTimestamp = listing.getLatestTimestamp().getTime(); + // if the local file's latest timestamp is beyond that of the value provided from the cache, replace if (minTimestamp == null || localTimestamp > minTimestamp) { minTimestamp = localTimestamp; - latestIdentifiersListed.clear(); - latestIdentifiersListed.addAll(listing.getMatchingIdentifiers()); } } @@ -287,18 +280,14 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab } if (minTimestamp != null) { - persist(minTimestamp, latestIdentifiersListed, stateManager, scope); + persist(minTimestamp, minTimestamp, stateManager, scope); } } - private void persist(final long timestamp, final Collection<String> identifiers, final StateManager stateManager, final Scope scope) throws IOException { - final Map<String, String> updatedState = new HashMap<>(identifiers.size() + 1); - updatedState.put(TIMESTAMP, String.valueOf(timestamp)); - int counter = 0; - for (final String identifier : identifiers) { - final String index = String.valueOf(++counter); - updatedState.put(IDENTIFIER_PREFIX + "." + index, identifier); - } + private void persist(final long listingTimestamp, final long processedTimestamp, final StateManager stateManager, final Scope scope) throws IOException { + final Map<String, String> updatedState = new HashMap<>(1); + updatedState.put(LISTING_TIMESTAMP_KEY, String.valueOf(listingTimestamp)); + updatedState.put(PROCESSED_TIMESTAMP_KEY, String.valueOf(processedTimestamp)); stateManager.setState(updatedState, scope); } @@ -316,41 +305,38 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { Long minTimestamp = lastListingTime; - try { - // We need to fetch the state from the cluster if we don't yet know the last listing time, - // or if we were just elected the primary node - if (this.lastListingTime == null || justElectedPrimaryNode) { + + if (this.lastListingTime == null || this.lastProcessedTime == null || justElectedPrimaryNode) { + try { + // Attempt to retrieve state from the state manager if a last listing was not yet established or + // if just elected the primary node final StateMap stateMap = context.getStateManager().getState(getStateScope(context)); - final Map<String, String> stateValues = stateMap.toMap(); - final String timestamp = stateValues.get(TIMESTAMP); - - if (timestamp == null) { - minTimestamp = 0L; - latestIdentifiersListed.clear(); - } else { - minTimestamp = this.lastListingTime = Long.parseLong(timestamp); - latestIdentifiersListed.clear(); - for (final Map.Entry<String, String> entry : stateValues.entrySet()) { - final String key = entry.getKey(); - final String value = entry.getValue(); - if (TIMESTAMP.equals(key)) { - continue; - } - - latestIdentifiersListed.add(value); + final String listingTimestampString = stateMap.get(LISTING_TIMESTAMP_KEY); + final String lastProcessedString= stateMap.get(PROCESSED_TIMESTAMP_KEY); + if (lastProcessedString != null) { + this.lastProcessedTime = Long.parseLong(lastProcessedString); + } + if (listingTimestampString != null) { + minTimestamp = Long.parseLong(listingTimestampString); + // If our determined timestamp is the same as that of our last listing, skip this execution as there are no updates + if (minTimestamp == this.lastListingTime) { + context.yield(); + return; + } else { + this.lastListingTime = minTimestamp; } } - justElectedPrimaryNode = false; + } catch (final IOException ioe) { + getLogger().error("Failed to retrieve timestamp of last listing from the State Manager. Will not perform listing until this is accomplished."); + context.yield(); + return; } - } catch (final IOException ioe) { - getLogger().error("Failed to retrieve timestamp of last listing from the State Manager. Will not perform listing until this is accomplished."); - context.yield(); - return; } final List<T> entityList; try { + // track of when this last executed for consideration of the lag millis entityList = performListing(context, minTimestamp); } catch (final IOException e) { getLogger().error("Failed to perform listing on remote host due to {}", e); @@ -358,65 +344,93 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab return; } - if (entityList == null) { + if (entityList == null || entityList.isEmpty()) { context.yield(); return; } Long latestListingTimestamp = null; - final List<T> newEntries = new ArrayList<>(); + final TreeMap<Long, List<T>> orderedEntries = new TreeMap<>(); + + // Build a sorted map to determine the latest possible entries for (final T entity : entityList) { - final boolean newTimestamp = minTimestamp == null || entity.getTimestamp() > minTimestamp; - final boolean newEntryForTimestamp = minTimestamp != null && entity.getTimestamp() == minTimestamp && !latestIdentifiersListed.contains(entity.getIdentifier()); - final boolean list = newTimestamp || newEntryForTimestamp; - - // Create the FlowFile for this path. - if (list) { - final Map<String, String> attributes = createAttributes(entity, context); - FlowFile flowFile = session.create(); - flowFile = session.putAllAttributes(flowFile, attributes); - session.transfer(flowFile, REL_SUCCESS); - - // If we don't have a new timestamp but just have a new entry, we need to - // add all of the previous entries to our entityList. If we have a new timestamp, - // then the previous entries can go away. - if (!newTimestamp) { - newEntries.addAll(entityList); + final long entityTimestamp = entity.getTimestamp(); + // New entries are all those that occur at or after the associated timestamp + final boolean newEntry = minTimestamp == null || entityTimestamp >= minTimestamp && entityTimestamp > lastProcessedTime; + + if (newEntry) { + List<T> entitiesForTimestamp = orderedEntries.get(entity.getTimestamp()); + if (entitiesForTimestamp == null) { + entitiesForTimestamp = new ArrayList<T>(); + orderedEntries.put(entity.getTimestamp(), entitiesForTimestamp); + } + entitiesForTimestamp.add(entity); + } + } + + int flowfilesCreated = 0; + + if (orderedEntries.size() > 0) { + latestListingTimestamp = orderedEntries.lastKey(); + + // If the last listing time is equal to the newest entries previously seen, + // another iteration has occurred without new files and special handling is needed to avoid starvation + if (latestListingTimestamp.equals(lastListingTime)) { + /* We are done when either: + * - the latest listing timestamp is If we have not eclipsed the minimal listing lag needed due to being triggered too soon after the last run + * - the latest listing timestamp is equal to the last processed time, meaning we handled those items originally passed over + */ + if (System.currentTimeMillis() - lastRunTime < LISTING_LAG_MILLIS || latestListingTimestamp.equals(lastProcessedTime)) { + context.yield(); + return; } - newEntries.add(entity); + } else { + // Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data + orderedEntries.remove(latestListingTimestamp); + } - if (latestListingTimestamp == null || entity.getTimestamp() > latestListingTimestamp) { - latestListingTimestamp = entity.getTimestamp(); + for (List<T> timestampEntities : orderedEntries.values()) { + for (T entity : timestampEntities) { + // Create the FlowFile for this path. + final Map<String, String> attributes = createAttributes(entity, context); + FlowFile flowFile = session.create(); + flowFile = session.putAllAttributes(flowFile, attributes); + session.transfer(flowFile, REL_SUCCESS); + flowfilesCreated++; } } } - final int listCount = newEntries.size(); - if (listCount > 0) { - getLogger().info("Successfully created listing with {} new objects", new Object[] {listCount}); - session.commit(); - - // We have performed a listing and pushed the FlowFiles out. - // Now, we need to persist state about the Last Modified timestamp of the newest file - // that we pulled in. We do this in order to avoid pulling in the same file twice. - // However, we want to save the state both locally and remotely. - // We store the state remotely so that if a new Primary Node is chosen, it can pick up where the - // previously Primary Node left off. - // We also store the state locally so that if the node is restarted, and the node cannot contact - // the distributed state cache, the node can continue to run (if it is primary node). - final Set<String> identifiers = new HashSet<>(newEntries.size()); - try { - for (final T entity : newEntries) { - identifiers.add(entity.getIdentifier()); + // As long as we have a listing timestamp, there is meaningful state to capture regardless of any outputs generated + if (latestListingTimestamp != null) { + boolean processedNewFiles = flowfilesCreated > 0; + if (processedNewFiles) { + // If there have been files created, update the last timestamp we processed + lastProcessedTime = orderedEntries.lastKey(); + getLogger().info("Successfully created listing with {} new objects", new Object[]{flowfilesCreated}); + session.commit(); + } + + lastRunTime = System.currentTimeMillis(); + + if (!latestListingTimestamp.equals(lastListingTime) || processedNewFiles) { + // We have performed a listing and pushed any FlowFiles out that may have been generated + // Now, we need to persist state about the Last Modified timestamp of the newest file + // that we evaluated. We do this in order to avoid pulling in the same file twice. + // However, we want to save the state both locally and remotely. + // We store the state remotely so that if a new Primary Node is chosen, it can pick up where the + // previously Primary Node left off. + // We also store the state locally so that if the node is restarted, and the node cannot contact + // the distributed state cache, the node can continue to run (if it is primary node). + try { + lastListingTime = latestListingTimestamp; + persist(latestListingTimestamp, lastProcessedTime, context.getStateManager(), getStateScope(context)); + } catch (final IOException ioe) { + getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or " + + "if another node begins executing this Processor, data duplication may occur.", ioe); } - persist(latestListingTimestamp, identifiers, context.getStateManager(), getStateScope(context)); - } catch (final IOException ioe) { - getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or " - + "if another node begins executing this Processor, data duplication may occur.", ioe); } - lastListingTime = latestListingTimestamp; - latestIdentifiersListed = identifiers; } else { getLogger().debug("There is no data to list. Yielding."); context.yield(); @@ -430,13 +444,18 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab } } + private void resetTimeStates() { + lastListingTime = null; + lastProcessedTime = 0L; + lastRunTime = 0L; + } /** * Creates a Map of attributes that should be applied to the FlowFile to represent this entity. This processor will emit a FlowFile for each "new" entity * (see the documentation for this class for a discussion of how this class determines whether or not an entity is "new"). The FlowFile will contain no * content. The attributes that will be included are exactly the attributes that are returned by this method. * - * @param entity the entity represented by the FlowFile + * @param entity the entity represented by the FlowFile * @param context the ProcessContext for obtaining configuration information * @return a Map of attributes for this entity */ @@ -460,9 +479,8 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab * will be filtered out by the Processor. Therefore, it is not necessary that implementations perform this filtering but can be more efficient * if the filtering can be performed on the server side prior to retrieving the information. * - * @param context the ProcessContex to use in order to pull the appropriate entities + * @param context the ProcessContex to use in order to pull the appropriate entities * @param minTimestamp the minimum timestamp of entities that should be returned. - * * @return a Listing of entities that have a timestamp >= minTimestamp */ protected abstract List<T> performListing(final ProcessContext context, final Long minTimestamp) throws IOException; http://git-wip-us.apache.org/repos/asf/nifi/blob/1a512cd1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java index e82be2c..b64e80d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java @@ -95,8 +95,8 @@ import org.apache.nifi.processors.standard.util.FileInfo; "rw-rw-r--") }) @SeeAlso({GetFile.class, PutFile.class, FetchFile.class}) -@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "After performing a listing of files, the timestamp of the newest file is stored, " - + "along with the filenames of all files that share that same timestamp. This allows the Processor to list only files that have been added or modified after " +@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "After performing a listing of files, the timestamp of the newest file is stored. " + + "This allows the Processor to list only files that have been added or modified after " + "this date the next time that the Processor is run. Whether the state is stored with a Local or Cluster scope depends on the value of the " + "<Input Directory Location> property.") public class ListFile extends AbstractListProcessor<FileInfo> { http://git-wip-us.apache.org/repos/asf/nifi/blob/1a512cd1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java index d92f398..f2df7da 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java @@ -51,8 +51,8 @@ import org.apache.nifi.processors.standard.util.SFTPTransfer; @WritesAttribute(attribute = "filename", description = "The name of the file on the SFTP Server"), @WritesAttribute(attribute = "path", description = "The fully qualified name of the directory on the SFTP Server from which the file was pulled"), }) -@Stateful(scopes = {Scope.CLUSTER}, description = "After performing a listing of files, the timestamp of the newest file is stored, " - + "along with the filename all files that share that same timestamp. This allows the Processor to list only files that have been added or modified after " +@Stateful(scopes = {Scope.CLUSTER}, description = "After performing a listing of files, the timestamp of the newest file is stored. " + + "This allows the Processor to list only files that have been added or modified after " + "this date the next time that the Processor is run. State is stored across the cluster so that this Processor can be run on Primary Node only and if " + "a new Primary Node is selected, the new node will not duplicate the data that was listed by the previous Primary Node.") public class ListSFTP extends ListFileTransfer { http://git-wip-us.apache.org/repos/asf/nifi/blob/1a512cd1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java index 7544eb8..a85ff2a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java @@ -18,7 +18,6 @@ package org.apache.nifi.processors.standard; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import java.io.File; import java.io.FileOutputStream; @@ -55,61 +54,127 @@ public class TestAbstractListProcessor { public final TemporaryFolder testFolder = new TemporaryFolder(); @Test - public void testOnlyNewEntriesEmitted() { + public void testOnlyNewEntriesEmitted() throws Exception { final ConcreteListProcessor proc = new ConcreteListProcessor(); final TestRunner runner = TestRunners.newTestRunner(proc); runner.run(); + final long initialTimestamp = System.currentTimeMillis(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - proc.addEntity("name", "id", 1492L); + proc.addEntity("name", "id", initialTimestamp); + proc.addEntity("name", "id2", initialTimestamp); runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1); + // First run, the above listed entries would be skipped to avoid write synchronization issues + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); runner.clearTransferState(); - proc.addEntity("name", "id2", 1492L); + // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again + Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + + // Running again, our two previously seen files are now cleared to be released runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); + runner.clearTransferState(); + + // Verify no new old files show up + proc.addEntity("name", "id2", initialTimestamp); + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); runner.clearTransferState(); - proc.addEntity("name", "id2", 1492L); + proc.addEntity("name", "id3", initialTimestamp - 1); runner.run(); runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); runner.clearTransferState(); - proc.addEntity("name", "id3", 1491L); + proc.addEntity("name", "id2", initialTimestamp); runner.run(); runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); runner.clearTransferState(); - proc.addEntity("name", "id2", 1492L); + // Now a new file beyond the current time enters + proc.addEntity("name", "id2", initialTimestamp + 1); + + // Nothing occurs for the first iteration as it is withheld runner.run(); runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); runner.clearTransferState(); - proc.addEntity("name", "id2", 1493L); + Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + + // But it should now show up that the appropriate pause has been eclipsed runner.run(); runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1); runner.clearTransferState(); + } + + @Test + public void testHandleRestartWithEntriesAlreadyTransferredAndNoneNew() throws Exception { + final ConcreteListProcessor proc = new ConcreteListProcessor(); + final TestRunner runner = TestRunners.newTestRunner(proc); + + final long initialTimestamp = System.currentTimeMillis(); + + proc.addEntity("name", "id", initialTimestamp); + proc.addEntity("name", "id2", initialTimestamp); + + // Emulate having state but not having had the processor run such as in a restart + final Map<String, String> preexistingState = new HashMap<>(); + preexistingState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, Long.toString(initialTimestamp)); + preexistingState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, Long.toString(initialTimestamp)); + runner.getStateManager().setState(preexistingState, Scope.CLUSTER); + + // run for the first time + runner.run(); + + // First run, the above listed entries would be skipped + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again + Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + + // Running again, these files should be eligible for transfer and again skipped + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + // Verify no new old files show up + proc.addEntity("name", "id2", initialTimestamp); + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + proc.addEntity("name", "id3", initialTimestamp - 1); + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); - proc.addEntity("name", "id2", 1493L); + proc.addEntity("name", "id2", initialTimestamp); runner.run(); runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); runner.clearTransferState(); - proc.addEntity("name", "id2", 1493L); + // Now a new file beyond the current time enters + proc.addEntity("name", "id2", initialTimestamp + 1); + + // Nothing occurs for the first iteration as it is withheld runner.run(); runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); runner.clearTransferState(); - proc.addEntity("name", "id", 1494L); + Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + + // But it should now show up that the appropriate pause has been eclipsed runner.run(); runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1); runner.clearTransferState(); } @Test - public void testStateStoredInClusterStateManagement() throws InitializationException { + public void testStateStoredInClusterStateManagement() throws Exception { final ConcreteListProcessor proc = new ConcreteListProcessor(); final TestRunner runner = TestRunners.newTestRunner(proc); final DistributedCache cache = new DistributedCache(); @@ -123,8 +188,17 @@ public class TestAbstractListProcessor { runner.run(); final Map<String, String> expectedState = new HashMap<>(); - expectedState.put(AbstractListProcessor.TIMESTAMP, "1492"); - expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".1", "id"); + // Ensure only timestamp is migrated + expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, "1492"); + expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "0"); + runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER); + + Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS); + + runner.run(); + // Ensure only timestamp is migrated + expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, "1492"); + expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "1492"); runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER); } @@ -145,8 +219,9 @@ public class TestAbstractListProcessor { final MockStateManager stateManager = runner.getStateManager(); final Map<String, String> expectedState = new HashMap<>(); - expectedState.put(AbstractListProcessor.TIMESTAMP, "1492"); - expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".1", "id"); + // Ensure only timestamp is migrated + expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, "1492"); + expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "1492"); stateManager.assertStateEquals(expectedState, Scope.CLUSTER); } @@ -188,13 +263,59 @@ public class TestAbstractListProcessor { // Verify the state manager now maintains the associated state final Map<String, String> expectedState = new HashMap<>(); - expectedState.put(AbstractListProcessor.TIMESTAMP, "1492"); - expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".1", "id"); - + // Ensure only timestamp is migrated + expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, "1492"); + expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "1492"); runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER); } @Test + public void testResumeListingAfterClearingState() throws Exception { + final ConcreteListProcessor proc = new ConcreteListProcessor(); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.run(); + + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + + final long initialEventTimestamp = System.currentTimeMillis(); + proc.addEntity("name", "id", initialEventTimestamp); + proc.addEntity("name", "id2", initialEventTimestamp); + + // Add entities but these should not be transferred as they are the latest values + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + + // after providing a pause in listings, the files should now transfer + Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); + runner.clearTransferState(); + + // Verify entities are not transferred again for the given state + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + // Clear state for this processor, eradicating timestamp + runner.getStateManager().clear(Scope.CLUSTER); + Assert.assertEquals("State is not empty for this component after clearing", 0, runner.getStateManager().getState(Scope.CLUSTER).toMap().size()); + + + // As before, we are unsure of when these files were delivered relative to system time, and additional cycle(s) need to occur before transfer + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + + // Ensure the original files are now transferred again. + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); + runner.clearTransferState(); + } + + @Test public void testFetchOnStart() throws InitializationException { final ConcreteListProcessor proc = new ConcreteListProcessor(); final TestRunner runner = TestRunners.newTestRunner(proc); @@ -209,41 +330,65 @@ public class TestAbstractListProcessor { } @Test - public void testOnlyNewStateStored() throws IOException { + public void testOnlyNewStateStored() throws Exception { final ConcreteListProcessor proc = new ConcreteListProcessor(); final TestRunner runner = TestRunners.newTestRunner(proc); runner.run(); + final long initialTimestamp = System.currentTimeMillis(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - proc.addEntity("name", "id", 1492L); - proc.addEntity("name", "id2", 1492L); + proc.addEntity("name", "id", initialTimestamp); + proc.addEntity("name", "id2", initialTimestamp); + + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); runner.run(); runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); runner.clearTransferState(); final StateMap stateMap = runner.getStateManager().getState(Scope.CLUSTER); - assertEquals(1, stateMap.getVersion()); + assertEquals(2, stateMap.getVersion()); final Map<String, String> map = stateMap.toMap(); - assertEquals(3, map.size()); - assertEquals("1492", map.get("timestamp")); - assertTrue(map.containsKey("id.1")); - assertTrue(map.containsKey("id.2")); + // Ensure only timestamp is migrated + assertEquals(2, map.size()); + assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.LISTING_TIMESTAMP_KEY)); + assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY)); - proc.addEntity("new name", "new id", 1493L); + proc.addEntity("new name", "new id", initialTimestamp + 1); runner.run(); + // Verify that the new entry has not been emitted but it has triggered an updated state + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + StateMap updatedStateMap = runner.getStateManager().getState(Scope.CLUSTER); + assertEquals(3, updatedStateMap.getVersion()); + + assertEquals(2, updatedStateMap.toMap().size()); + assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LISTING_TIMESTAMP_KEY)); + // Processed timestamp is lagging behind currently + assertEquals(Long.toString(initialTimestamp), updatedStateMap.get(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY)); + + Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + + runner.run(); runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1); - final StateMap updatedStateMap = runner.getStateManager().getState(Scope.CLUSTER); - assertEquals(2, updatedStateMap.getVersion()); + runner.clearTransferState(); - final Map<String, String> updatedValues = updatedStateMap.toMap(); - assertEquals(2, updatedValues.size()); - assertEquals("1493", updatedValues.get("timestamp")); - assertEquals("new id", updatedValues.get("id.1")); - } + updatedStateMap = runner.getStateManager().getState(Scope.CLUSTER); + assertEquals(4, updatedStateMap.getVersion()); + assertEquals(2, updatedStateMap.toMap().size()); + assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LISTING_TIMESTAMP_KEY)); + // Processed timestamp is now caught up + assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY)); + } private static class DistributedCache extends AbstractControllerService implements DistributedMapCacheClient { private final Map<Object, Object> stored = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/nifi/blob/1a512cd1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java index 93319d0..441a561 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java @@ -45,7 +45,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -public class TestListFile { +public class TestListFile { final String TESTDIR = "target/test/data/in"; final File testDir = new File(TESTDIR); @@ -112,9 +112,16 @@ public class TestListFile { // process first file and set new timestamp runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); runner.run(); + + runner.assertTransferCount(ListFile.REL_SUCCESS, 0); + runner.clearTransferState(); + + Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + + runner.run(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); - final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); - assertEquals(1, successFiles.size()); + final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); + assertEquals(1, successFiles1.size()); // create second file final File file2 = new File(TESTDIR + "/listing2.txt"); @@ -124,6 +131,12 @@ public class TestListFile { // process second file after timestamp runner.clearTransferState(); runner.run(); + runner.assertTransferCount(ListFile.REL_SUCCESS, 0); + runner.clearTransferState(); + + Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + + runner.run(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); assertEquals(1, successFiles2.size()); @@ -147,11 +160,18 @@ public class TestListFile { runner.run(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles4 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); - assertEquals(3, successFiles4.size()); + assertEquals(2, successFiles4.size()); + + runner.clearTransferState(); + + Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + + runner.run(); + runner.assertTransferCount(ListFile.REL_SUCCESS, 1); } @Test - public void testFilterAge() throws IOException { + public void testFilterAge() throws Exception { final File file1 = new File(TESTDIR + "/age1.txt"); assertTrue(file1.createNewFile()); assertTrue(file1.setLastModified(time0millis)); @@ -167,9 +187,15 @@ public class TestListFile { // check all files runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); runner.run(); + runner.assertTransferCount(ListFile.REL_SUCCESS, 2); + runner.clearTransferState(); + + Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + + runner.run(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); - assertEquals(3, successFiles1.size()); + assertEquals(1, successFiles1.size()); // exclude oldest runner.clearTransferState(); @@ -178,7 +204,13 @@ public class TestListFile { runner.run(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); - assertEquals(2, successFiles2.size()); + assertEquals(1, successFiles2.size()); + runner.clearTransferState(); + + Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + + runner.run(); + runner.assertTransferCount(ListFile.REL_SUCCESS, 1); // exclude newest runner.clearTransferState(); @@ -187,7 +219,7 @@ public class TestListFile { runner.run(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); - assertEquals(2, successFiles3.size()); + assertEquals(1, successFiles3.size()); // exclude oldest and newest runner.clearTransferState(); @@ -196,11 +228,18 @@ public class TestListFile { runner.run(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles4 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); - assertEquals(1, successFiles4.size()); + assertEquals(0, successFiles4.size()); + runner.clearTransferState(); + + Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + + runner.run(); + runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); + runner.assertTransferCount(ListFile.REL_SUCCESS, 1); } @Test - public void testFilterSize() throws IOException { + public void testFilterSize() throws Exception { final byte[] bytes1000 = new byte[1000]; final byte[] bytes5000 = new byte[5000]; final byte[] bytes10000 = new byte[10000]; @@ -227,6 +266,13 @@ public class TestListFile { // check all files runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); runner.run(); + runner.assertTransferCount(ListFile.REL_SUCCESS, 0); + runner.clearTransferState(); + + Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + + runner.run(); + runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); assertEquals(3, successFiles1.size()); @@ -238,6 +284,14 @@ public class TestListFile { runner.setProperty(ListFile.MIN_SIZE, "0 b"); runner.setProperty(ListFile.MAX_SIZE, "7500 b"); runner.run(); + + runner.assertTransferCount(ListFile.REL_SUCCESS, 0); + runner.clearTransferState(); + + Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + + runner.run(); + runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); assertEquals(2, successFiles2.size()); @@ -249,6 +303,12 @@ public class TestListFile { runner.setProperty(ListFile.MIN_SIZE, "2500 b"); runner.removeProperty(ListFile.MAX_SIZE); runner.run(); + runner.assertTransferCount(ListFile.REL_SUCCESS, 0); + runner.clearTransferState(); + + Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + + runner.run(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); assertEquals(2, successFiles3.size()); @@ -260,13 +320,20 @@ public class TestListFile { runner.setProperty(ListFile.MIN_SIZE, "2500 b"); runner.setProperty(ListFile.MAX_SIZE, "7500 b"); runner.run(); + runner.assertTransferCount(ListFile.REL_SUCCESS, 0); + runner.clearTransferState(); + + Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + + runner.run(); + runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles4 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); assertEquals(1, successFiles4.size()); } @Test - public void testFilterHidden() throws IOException { + public void testFilterHidden() throws Exception { FileOutputStream fos; final File file1 = new File(TESTDIR + "/hidden1.txt"); @@ -293,6 +360,11 @@ public class TestListFile { runner.removeProperty(ListFile.MAX_SIZE); runner.setProperty(ListFile.IGNORE_HIDDEN_FILES, "false"); runner.run(); + runner.assertTransferCount(ListFile.REL_SUCCESS, 0); + + Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + + runner.run(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); assertEquals(2, successFiles1.size()); @@ -301,13 +373,18 @@ public class TestListFile { runner.clearTransferState(); runner.setProperty(ListFile.IGNORE_HIDDEN_FILES, "true"); runner.run(); + runner.assertTransferCount(ListFile.REL_SUCCESS, 0); + + Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + + runner.run(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); assertEquals(1, successFiles2.size()); } @Test - public void testFilterFilePattern() throws IOException { + public void testFilterFilePattern() throws Exception { final File file1 = new File(TESTDIR + "/file1-abc-apple.txt"); assertTrue(file1.createNewFile()); @@ -325,6 +402,11 @@ public class TestListFile { runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); runner.setProperty(ListFile.FILE_FILTER, ListFile.FILE_FILTER.getDefaultValue()); runner.run(); + runner.assertTransferCount(ListFile.REL_SUCCESS, 0); + + Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + + runner.run(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); assertEquals(4, successFiles1.size()); @@ -333,13 +415,18 @@ public class TestListFile { runner.clearTransferState(); runner.setProperty(ListFile.FILE_FILTER, ".*-xyz-.*"); runner.run(); + runner.assertTransferCount(ListFile.REL_SUCCESS, 0); + + Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + + runner.run(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); assertEquals(2, successFiles2.size()); } @Test - public void testFilterPathPattern() throws IOException { + public void testFilterPathPattern() throws Exception { final File subdir1 = new File(TESTDIR + "/subdir1"); assertTrue(subdir1.mkdirs()); @@ -363,6 +450,11 @@ public class TestListFile { runner.setProperty(ListFile.FILE_FILTER, ListFile.FILE_FILTER.getDefaultValue()); runner.setProperty(ListFile.RECURSE, "true"); runner.run(); + runner.assertTransferCount(ListFile.REL_SUCCESS, 0); + + Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + + runner.run(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); assertEquals(4, successFiles1.size()); @@ -372,6 +464,11 @@ public class TestListFile { runner.setProperty(ListFile.PATH_FILTER, "subdir1"); runner.setProperty(ListFile.RECURSE, "true"); runner.run(); + runner.assertTransferCount(ListFile.REL_SUCCESS, 0); + + Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + + runner.run(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); assertEquals(3, successFiles2.size()); @@ -381,13 +478,18 @@ public class TestListFile { runner.setProperty(ListFile.PATH_FILTER, "subdir2"); runner.setProperty(ListFile.RECURSE, "true"); runner.run(); + runner.assertTransferCount(ListFile.REL_SUCCESS, 0); + + Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + + runner.run(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); assertEquals(1, successFiles3.size()); } @Test - public void testRecurse() throws IOException { + public void testRecurse() throws Exception { final File subdir1 = new File(TESTDIR + "/subdir1"); assertTrue(subdir1.mkdirs()); @@ -408,6 +510,12 @@ public class TestListFile { runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); runner.setProperty(ListFile.RECURSE, "true"); runner.run(); + runner.assertTransferCount(ListFile.REL_SUCCESS, 0); + runner.clearTransferState(); + + Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + + runner.run(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS, 3); final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); for (final MockFlowFile mff : successFiles1) { @@ -435,13 +543,21 @@ public class TestListFile { runner.clearTransferState(); runner.setProperty(ListFile.RECURSE, "false"); runner.run(); + + runner.assertTransferCount(ListFile.REL_SUCCESS, 0); + runner.clearTransferState(); + + Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + + runner.run(); + runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); assertEquals(1, successFiles2.size()); } @Test - public void testReadable() throws IOException { + public void testReadable() throws Exception { final File file1 = new File(TESTDIR + "/file1.txt"); assertTrue(file1.createNewFile()); @@ -456,13 +572,18 @@ public class TestListFile { runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); runner.setProperty(ListFile.RECURSE, "true"); runner.run(); + runner.assertTransferCount(ListFile.REL_SUCCESS, 0); + runner.clearTransferState(); + + Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + + runner.run(); runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); - final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); - assertEquals(3, successFiles1.size()); + runner.assertTransferCount(ListFile.REL_SUCCESS, 3); } @Test - public void testAttributesSet() throws IOException { + public void testAttributesSet() throws Exception { // create temp file and time constant final File file1 = new File(TESTDIR + "/file1.txt"); assertTrue(file1.createNewFile()); @@ -477,6 +598,13 @@ public class TestListFile { runner.clearTransferState(); runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); runner.run(); + runner.assertTransferCount(ListFile.REL_SUCCESS, 0); + runner.clearTransferState(); + + Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS * 2); + + runner.run(); + runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); assertEquals(1, successFiles1.size());