For a little more background, the problem we've found in some use cases is that it's difficult to control the processing time when handling batches of records. If processing takes longer than the consumer's session timeout, then the member is kicked out of the group, which can cause excessive rebalancing and duplicate consumption. There aren't particularly convenient ways to handle this currently, so we're proposing to give the user finer control over the number of records returned in each call to poll(). This makes it easier in some use cases to adjust the session timeout to make unexpected rebalances unlikely.
I actually don't have a strong preference between the two options mentioned Jens, but I assume the configuration option will be more palatable since it preserves compatibility without needing to overload poll(). One question I have is how the consumer will enforce fairness between partitions given the constraint on the number of records returned. We'd want to ensure that all partitions are consumed if data is available. A simple option might be to round-robin between the partitions when filling the ConsumerRecords collection. We'll try to work out some of these details and update the wiki. -Jason On Tue, Dec 22, 2015 at 3:50 PM, Jens Rantil <jens.ran...@tink.se> wrote: > Hi Kafkaians, > > Me and Jason Gustafson have been working on a KIP to be able to set the > maximum number of messages that a poll() call will return in the new > Consumer API. You can find the KIP here: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records > It contains a full motivation, as well as links to JIRAs and mailing list > discussion. > > We are fairly done with the motivation as to why this is a needed feature. > However, it feels like there are two approaches to limit the number of > messages returned from a poll() call: > > 1. Introduce a second parameter to poll() so that it is given the > following signature: ConsumerRecords<K, V> poll(long timeout, int > maxMessages). > 2. Introduce a configuration option, say *max.poll.messages*, which is > given at KafkaConsumer construction. > > Arguments for 1): > > - It would make the Java API more explicit. > - One would avoid an additional configuration. A counter-argument to > that is that there are already many configuration options already. > Avoiding > yet another would not make that much difference. > - It would allow the poll loop to dynamically change the max number of > messages returned. However, this sounds like a non-requirement. > > Arguments for 2): > > - 1) would introduce yet another public method to would have to be > maintained for a long time in the future. > - Introducing the *max.poll.messages* approach would allow us to > optimizations to reduce max messages to be fetched at initialization > instead of at runtime. It would thus make the code paths simpler in each > poll() call since message limitting could be abstracted out/away. > > Based on Jason's input on mailing list, on JIRA and in wiki, it feels like > he's been leaning towards 1). I am personally leaning towards 2). What do > you think? Do you have any other input? > > Looking forward to your input - cheers, > > Jens > > > -- > Jens Rantil > Backend engineer > Tink AB > > Email: jens.ran...@tink.se > Phone: +46 708 84 18 32 > Web: www.tink.se > > Facebook <https://www.facebook.com/#!/tink.se> Linkedin > < > http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary > > > Twitter <https://twitter.com/tink> >