This is an interesting idea to solve an admitted problem, but I wonder how it 
comports with the core tenets of Flow Based Programming on which NiFi is 
modeled. This seems to introduce globally-coupled dependencies between all 
queues in a flow, where another solution (flow segment-based resource 
allocation) might solve this problem without requiring per-queue contention on 
every cycle. I think with the stateless NiFi work there has been some 
discussion around being able to control the resource allocation for a flow 
segment. Sam Hjemfelt, any thoughts here?


Andy LoPresto
[email protected]
[email protected]
PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69

> On Oct 17, 2019, at 8:54 AM, Kessler, Jon <[email protected]> wrote:
> 
> Joe, hopefully I addressed all of your questions:
> 
> Your interpretation of the scheduling aspect is correct. These queues will 
> pretend to be empty a certain % of the time if higher priority data recently 
> moved elsewhere. That % is configurable on a per rule basis which allows the 
> operator to determine how much to stagger the data associated with each rule. 
> That % is also how the rules are ranked in terms of order of priority. The 
> higher the %, the more often a rule will make use of its threads so the 
> higher its priority is considered to be.
> 
> Administration: You are correct that the ruleset is provided at the flow 
> controller level but will be leveraged by all connections regardless of 
> whether or not they use the BucketPrioritizer (more details on this below). 
> This overall solution only works if all FlowFileQueues are of this new 
> implementation which is why I tied it to nifi.properties settings.
> 
> The sorting function here takes place on insertion into any connection on 
> which a BucketPrioritizer is set. Once a FlowFile has been sorted into a 
> bucket we maintain that state so that each time it moves into a new 
> connection we already know in which bucket it should be placed without 
> needing to have a BucketPrioritizer set on that connection. Each bucket in a 
> connection is just a FIFO queue so no additional sorting is done. You should 
> only have to configure connections to use the BucketPrioritizer at points in 
> your flow where you believe you'll have enough information to accurately 
> determine priority but not beyond that point unless you want to re-evaluate 
> downstream for some reason. There is administration involved in setting these 
> BucketPrioritizers on some connections but it should be minimal per flow 
> (sometimes as few as one).
> 
> Some additional information: When you delete a rule the next time each 
> FlowFile moves that was already associated with that rule it will be 
> re-evaluated against the ruleset when it enters the next connection 
> regardless of whether or not a BucketPrioritizer was set on that connection. 
> Also FlowFiles that have yet to be evaluated (have yet to encounter a 
> BucketPrioritizer) will not be staggered. This was a design decision that if 
> we don't know what a priority is for a given FlowFile we should get it to 
> that point in the flow as soon as possible. This decision was a result of 
> emperical evidence that when we did stagger unevaluated data an incoming flow 
> of high priority data slowed its own upstream processing down once it was 
> identified and processed as high priority.
> 
> Multi-tenancy: Agreed that a global priority list could be too restrictive 
> for multi-tenancy and should be addressed.
> 
> Per swapping, this is an area where I admittedly need to put more thought 
> into my implementation because there is plenty of room for improvement. Right 
> now I'm just swapping files to disk in order of least to greatest priority 
> but they are all stored together. Therefore they're read back into memory in 
> order of least to greatest priority. More work should be done here.
> 
>  - Jon
> ________________________________
> From: Joe Witt <[email protected] <mailto:[email protected]>>
> Sent: Thursday, October 17, 2019 8:12:52 AM
> To: [email protected] <mailto:[email protected]>
> Subject: EXT: Re: [Discuss] Data prioritization - A proposed solution
> 
> Jon
> 
> Probably some details I don't quite understand yet so responses here are to
> get there...
> 
> The concept for scheduling is interesting.  Does this basically work around
> the fact that we have an unfair scheduler so this has queue implementations
> which pretend data is not available when it knows that there is higher
> priority data available elsewhere thus returning more threads to the pool
> faster to increase the likelihood that queues with higher priority data
> will get served more often?
> 
> The notion of prioritization implies there is a sorting function happening
> somewhere.  NiFi now does sorting on insertion to every queue.  At what
> points are you suggesting sorting can be done/reduced to?
> 
> Administration: The existing model does require each prioritizer to be set
> for each queue.  Yours does as well - to opt into this you'd have to select
> the BucketPrioritizer right?  It seems like you're saying the priority
> ruleset would be provided at the flow controller level and be enforced by
> all connections which leverage this prioritizer.  For large multi-tenant
> nifi flows having a global ruleset might be too limiting but maybe we just
> dont worry about that yet.
> 
> How does this idea work with the fact that queues as the reach a given
> threshold have their data swapped out to disk and as data gets worked off
> the flowfiles get swapped back into memory?
> 
> Thanks
> Joe
> 
> On Thu, Oct 17, 2019 at 7:28 AM Kessler, Jon <[email protected]> wrote:
> 
>> I want to start a discussion about a new prioritization mechanism that
>> addresses some of the issues that I believe exist in the current solution.
>> These issues are:
>> 
>> - Scheduling: No consideration is given to data priority when determining
>> which component is given the next available thread with which to work
>> - Constant sorting: Because all flowfiles in a given connection share the
>> same PriorityQueue they must be sorted every time they move. While this
>> sort is efficient it can add up as queues grow deep.
>> - Administration: There is a costly human element to managing the value
>> used as a priority ranking as priorities change. You must also ensure every
>> connection in the appropriate flow has the proper prioritizer assigned to
>> it to make use of the property.
>> 
>> We have developed a prototype of a new FlowFileQueue implementation that
>> addresses these issues. Use of this implementation is controlled via
>> https://linkprotect.cudasvc.com/url?a=https%3a%2f%2fnifi.properties&c=E,1,D-6vNtOC1armix171OZD90B4R0R1UHhG0ngPYawBEOljpSN5kOIfQ8lO9bTcExg_yuJivgzihwwJb6bhpftjXpqwjEl6UEcrh5miqpS_EFIvMdizFFNG&typo=1
>>  
>> <https://linkprotect.cudasvc.com/url?a=https%3a%2f%2fnifi.properties&c=E,1,D-6vNtOC1armix171OZD90B4R0R1UHhG0ngPYawBEOljpSN5kOIfQ8lO9bTcExg_yuJivgzihwwJb6bhpftjXpqwjEl6UEcrh5miqpS_EFIvMdizFFNG&typo=1>
>>  so you can opt-in or out system-wide without doing a lot of
>> configuration. Its design goals are:
>> 
>>  - Instead of using the value of a FlowFile attribute as a ranking,
>> maintain a set of expression language rules to define your priorities. The
>> highest ranked rule that a given FlowFile satisfies will be that FlowFile's
>> priority
>>  - Because we have a finite set of priority rules we can utilize a bucket
>> sort in our connections. One bucket per priority rule. The bucket/rule with
>> which a FlowFile is associated with will be maintained so that as it moves
>> through the system we do not have to re-evaluate that Flowfile against our
>> ruleset unless we have reason to do so.
>>  - Control where in your flow FlowFiles are evaluated against the ruleset
>> with a new Prioritizer implementation: BucketPrioritizer.
>>  - When this queue implementation is polled it will be able to check
>> state to see if any data of a higher priority than what it currently
>> contains recently (within 5s) moved elsewhere in the system. If higher
>> priority data has recently moved elsewhere, the connection will only
>> provide a FlowFile X% of the time where X is defined along with the rule.
>> This allows higher priority data to have more frequent access to threads
>> without thread-starving lower priority data.
>>  - Rules will be managed via a menu option for the flow and changes to
>> them take effect instantly. This allows you to change your priorities
>> without stopping/editing/restarting various components on the graph.
>> 
>> I intend to contribute this solution but first want to solicit input and
>> opinions.
>> 
>>  - Jon Kessler

Reply via email to