[ 
https://issues.apache.org/jira/browse/NIFI-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15855371#comment-15855371
 ] 

Koji Kawamura commented on NIFI-3414:
-------------------------------------

Thanks Matt for explaining detailed use-case. (I thought I wrote a reply last 
week but the comment wasn't saved properly..)

I'm going to implement the processor, and add following capabilities as well:

- To not update state storage too often, batch up input flow files max to 'Max 
Batch Count' and process those at once
- Group incoming flow files by 'Group Identifier', and handle multiple groups
- Add 'wait' queue to route flow files those didn't match with current order 
number, so that those leave from the incoming queue and make room for other 
flow files can be enqueued. Penalize those, too.
- Track how long a flow file has been waiting. If it exceeds configured 
'Timeout', routes it to 'timeout' relationship. Update the order number with 
the maximum + 1 of timeout flow files, so that if the skipped flow file arrives 
later, processor can detect it
- If incoming file has an order number younger than the current, then route it 
to 'skipped' relationship

> Implement an EnforceOrder processor
> -----------------------------------
>
>                 Key: NIFI-3414
>                 URL: https://issues.apache.org/jira/browse/NIFI-3414
>             Project: Apache NiFi
>          Issue Type: New Feature
>            Reporter: Matt Burgess
>            Assignee: Koji Kawamura
>
> For some flows, it is imperative that the flow files are processed in a 
> certain order.  The PriorityAttributePrioritizer can be used on a connection 
> to ensure that flow files going through that connection are in priority 
> order, but depending on error-handling, branching, and other flow designs, it 
> is possible for flow files to get out-of-order.
> I propose an EnforceOrder processor, which would be single-threaded and have 
> (at a minimum) the following properties:
> 1) Order Attribute: This would be the name of a flow file attribute from 
> which the current value will be retrieved.
> 2) Initial Value: This property specifies an initial value for the order. The 
> processor is stateful, however, so this property is only used when there is 
> no entry in the state map for current value.
> The processor would store the Initial Value into the state map (if no state 
> map entry exists), then for each incoming flow file, it checks the value in 
> the Order Attribute against the current value.  If the attribute value 
> matches the current value, the flow file is transferred to the "success" 
> relationship, and the current value is incremented in the state map. If the 
> attribute value does not match the current value, the session will be rolled 
> back.
> Using this processor, along with a PriorityAttributePrioritizer on the 
> incoming connection, will allow for out-of-order flow files to have a sort of 
> "barrier", thereby guaranteeing that flow files transferred to the "success" 
> relationship are in the specified order.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to