[
https://issues.apache.org/jira/browse/NIFI-6831?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jon Kessler reassigned NIFI-6831:
---------------------------------
Assignee: Jon Kessler
> Create a flowfile queue implementation with global data priority awareness
> --------------------------------------------------------------------------
>
> Key: NIFI-6831
> URL: https://issues.apache.org/jira/browse/NIFI-6831
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Core Framework
> Affects Versions: 1.11.0
> Reporter: Jon Kessler
> Assignee: Jon Kessler
> Priority: Major
>
> There is currently no way to process data in order by priority on a flow-wide
> or global scale. There are several issues with the way sorting by priority
> attribute is currently done in the framework that I believe we can address
> with a new flowfile queue implementation. Those shortcomings are:
> * Scheduling: No consideration is given to data priority when determining
> which component is given the next available thread with which to work
> * Constant sorting: Because all flowfiles in a given connection share the
> same PriorityQueue they must be sorted every time they move. While this sort
> is efficient it can add up as queues grow deep.
> * Administration: There is a costly human element to managing the value used
> as a priority ranking as priorities change. You must also ensure every
> connection in the appropriate flow has the proper prioritizer assigned to it
> to make use of the property.
> The design goals of this new priority mechanism and flowfile queue
> implementation are:
> * Instead of using the value of a FlowFile attribute as a ranking, maintain
> a set of expression language rules to define your priorities. The highest
> ranked rule that a given FlowFile satisfies will be that FlowFile's priority
> * Because we have a finite set of priority rules we can utilize a bucket
> sort in our connections. One bucket per priority rule. The bucket/rule with
> which a FlowFile is associated with will be maintained so that as it moves
> through the system we do not have to re-evaluate that Flowfile against our
> ruleset unless we have reason to do so.
> * Control where in your flow FlowFiles are evaluated against the ruleset
> with a new Prioritizer implementation: BucketPrioritizer.
> * When this queue implementation is polled it will be able to check state to
> see if any data of a higher priority than what it currently contains recently
> (within 5s) moved elsewhere in the system. If higher priority data has
> recently moved elsewhere, the connection will only provide a FlowFile X% of
> the time where X is defined along with the rule. This allows higher priority
> data to have more frequent access to threads without thread-starving lower
> priority data.
> * Rules will be managed via a menu option for the flow and changes to them
> take effect instantly. This allows you to change your priorities without
> stopping/editing/restarting various components on the graph.
> Additional design considerations:
> The sorting function here takes place on insertion into any connection on
> which a BucketPrioritizer is set. Once a FlowFile has been sorted into a
> bucket we maintain that state so that each time it moves into a new
> connection we already know in which bucket it should be placed without
> needing to have a BucketPrioritizer set on that connection. Each bucket in a
> connection is just a FIFO queue so no additional sorting is done. You should
> only have to configure connections to use the BucketPrioritizer at points in
> your flow where you believe you'll have enough information to accurately
> determine priority but not beyond that point unless you want to re-evaluate
> downstream for some reason. There is administration involved in setting these
> BucketPrioritizers on some connections but it should be minimal per flow
> (sometimes as few as one).
> When you delete a rule the next time each FlowFile moves that was already
> associated with that rule it will be re-evaluated against the ruleset when it
> enters the next connection regardless of whether or not a BucketPrioritizer
> was set on that connection. Also FlowFiles that have yet to be evaluated
> (have yet to encounter a BucketPrioritizer) will not be staggered. This was a
> design decision that if we don't know what a priority is for a given FlowFile
> we should get it to that point in the flow as soon as possible. This decision
> was a result of empirical evidence that when we did stagger unevaluated data
> an incoming flow of high priority data slowed its own upstream processing
> down once it was identified and processed as high priority.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)