Joseph Witt commented on NIFI-5406:

Koji this is a great writeup.  I think we need to 
1. make sure to update ListFile (not create a new processor) to leverage this
2. instead of only updating the entry and thus being eligible to create the 
listing when modified date becomes newer we should track it for any change 
older/newer/size in the watch window.  The watch window should be the space of 
time over which we will record/track all listings and if there is any change, 
new listing or change in date/time for an existing listing we will create a 
listing result.
3. i think the DistributedMapCache should be optional.  If not provided we'll 
use the old limited but extremely fast way that works very well for a certain 
class of use cases that behave a certain way and if provided then we will use 
the slow/but sure brute force option of tracking state.

We need to clearly document how this will behave when people want this 
processor to act like an Idempotent reader indefinitely.  This could be the 
case where they put this processor on some watch directory (w wo recursion).  
To achieve this they can set the watch window to a really large time window.

We need to make sure the distributed map cache clients keep some finite amount 
of state locally cached and that is a different concern than the map cache impl 
they talk to which must persist the state for the given watch window and roll 
all else out properly so it isn't keeping unbounded amount of state as well.  
This comment is without knowing the internals of how these work today so this 
might already be sorted.


> Add processors to list new or updated files by tracking listed entities
> -----------------------------------------------------------------------
>                 Key: NIFI-5406
>                 URL: https://issues.apache.org/jira/browse/NIFI-5406
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Koji Kawamura
>            Assignee: Koji Kawamura
>            Priority: Major
> Current List processors (ListFile, ListFTP, ListSFTP ... etc) implementation 
> relies on file last modified timestamp to pick new or updated files. This 
> approach is efficient and lightweight in terms of state management, because 
> it only tracks latest modified timestamp and last executed timestamp. 
> However, timestamps do not work as expected in some file systems, causing 
> List processors missing files periodically. See NIFI-3332 comments for 
> details.
> In order to pick every entity that has not seen before or has been updated 
> since it had seen last time, we need another set of processors using 
> different approach, that is by tracking listed entities:
>  * Add new abstract processor AbstractWatchEntries similar to 
> AbstractListProcessor but uses different approach
>  * Target entities have: name (path), size and last-modified-timestamp
>  * Implementation Processors have following properties:
>  ** 'Watch Time Window' to limit the maximum time period to hold the already 
> listed entries. E.g. if set as '30min', the processor keeps entities listed 
> in the last 30 mins.
>  ** 'Minimum File Age' to defer listing entities potentially being written
>  * Any entity added but not listed ever having last-modified-timestamp older 
> than configured 'Watch Time Window' will not be listed. If user needs to pick 
> these items, they have to make 'Watch Time Window' longer. It also increases 
> the size of data the processor has to persist in the K/V store. Efficiency vs 
> reliability trade-off.
>  * The already-listed entities are persisted into one of supported K/V store 
> through DistributedMapCacheClient service. User can chose what KVS to use 
> from HBase, Redis, Couchbase and File (DistributedMapCacheServer with 
> persistence file).
>  * The reason to use KVS instead of ManagedState is, to avoid hammering 
> Zookeeper too much with frequently updating Zk node with large amount of 
> data. The number of already-listed entries can be huge depending on 
> use-cases. Also, we can compress entities with DistributedMapCacheClient as 
> it supports putting byte array, while ManagedState only supports Map<String, 
> String>.
>  * On each onTrigger:
>  ** Processor performs listing. Listed entries meeting any of the following 
> condition will be written to the 'success' output FlowFile:
>  *** Not exists in the already-listed entities
>  *** Having newer last-modified-timestamp
>  *** Having different size
>  ** Already listed entries those are old enough compared to 'Watch Time 
> Window' are discarded from the already-listed entries.
>  * Initial supporting target is Local file system, FTP and SFTP

This message was sent by Atlassian JIRA

Reply via email to