[ 
https://issues.apache.org/jira/browse/STORM-31?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15170756#comment-15170756
 ] 

Vibha Goyal commented on STORM-31:
----------------------------------

Hi, 

I understand that max.spout.pending is a simple way to achieve back-pressure 
with acking-enabled.

I want to understand, do we still need max.spout.pending to throttle the spout 
now that the back-pressure has been implemented? 

Thanks!

> Auto-tune max spout pending
> ---------------------------
>
>                 Key: STORM-31
>                 URL: https://issues.apache.org/jira/browse/STORM-31
>             Project: Apache Storm
>          Issue Type: Improvement
>          Components: storm-core
>            Reporter: James Xu
>              Labels: HighPriority
>
> https://github.com/nathanmarz/storm/issues/385
> It should be relatively easy to have spouts automatically tune max spout 
> pending so that topologies can handle high throughputs with minimal tuning. A 
> spout should look at the average complete latency over the last 10 minutes 
> and compare that to the message timeout. If it's significantly lower, it 
> should increase pending. If it's close, it should decrease.
> --------------------------------------------------------------------------------------------------
> @jasonjckn: So as you know there's no way to count # of replays in storm, or 
> more generally, there's no storm spout concept of consumption progress. There 
> is # of acked tuples, but this doesn't differentiate between replays. While I 
> think the design choice is actually appropriate when you think how 
> consumption means different things in kafka, the variety of different spouts 
> that can exist, etc. This does lead to problems with any auto tune max spout 
> pending algorithm I could possibly devise based on the current executor 
> stats. For example with kafka a particular offset references a block of 
> messages compressed together, not individual messages, and when a particular 
> tuple fails this will lead to that entire block of tuples being replayed, and 
> possibly other blocks that came afterwords. Now imagine a kafka block of 1000 
> tuples, and there's a 1% chance of a failed tuple, so this can lead to zero 
> progress, because every single kafka block has at least 1 tuple that failed. 
> However this also has a nice ratio of # acks / # emits, because all of the 
> acks are replayes. Contrasting this where the user logic does some kind of 
> batching, and database batch updates, this tends to have the property where 
> entire batches succeed or fail (This is how trident works), so if 4 out of 5 
> batches were to succeed, and a batch includes 1 kafka block from each 
> partition, then you're making a lot of progress, but your # acks / # emits 
> ratio is 4/5. So lower emit ratio, but a lot of progress is being made.
> So here's an idea to solve this:
> Expose setMaxSpoutPending to spout implementations so they can call it 
> whenever they want. Then the user would enable auto tune max spout pending as 
> a KafkaSpout option or KestrelSpout Option (instead of topology config).
> Then you would write a generic max spout pending algorithm which would be a 
> thermostat on 'consumption throughput'. 'amount consumed' is an abstract 
> concept, and it's an input into this algorithm. 'amount consumed' is assumed 
> to be a monotonically increasing number and to indicate durable consumption 
> occurring. So in the kafka implementation you would plugin this algorithm, 
> and 'amount consumed' would be defined as the oldest offset to a kafka block 
> that's been acked.
> I think moving this logic into the spout implementation a lot of sense when 
> consumption means different things depending on the whatever is backing the 
> spout. I thought about introducing tuple replays as a concept in storm, but 
> there's no easy way to track this at the tuple level in kafka, just the kafka 
> block level. Also in trident this makes a lot of sense because the # of 
> pending tuples is actually the number of parallel batches. So if you set max 
> spout pending to 5, then that's 5 parallel batches. By allowing the trident 
> master coordinator spout to set it's own max spout pending it could do 
> something smart for optimizing the number of parallel batches.
> As a side note, a kafka block may contain any number of messages, and the 
> kafka offsets jump in steps of block size (block size is not fixed either), 
> it's basically just an offset on disk, so this would lead to the auto tune 
> max spout pending maximizing not on message throughput, but rather compressed 
> data throughput. I don't think this matters one way or the other, you could 
> always do message throughput by counting the number of messages in each 
> block, but that's a more complex implementation, i'd just do the simpler one 
> first (the oldest unacked kafka offset).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to