Hey Dhruvil, Thanks for the KIP. Looks good overall. I have a few questions about the new configs:
1. I'm mainly wondering how necessary the configs are given the improvements in this KIP to reduce memory pressure from down-conversion. The reason I ask is that we'll be stuck with this config for a long time, so we should make sure we really need it. Ideally the improvements here are enough to make the memory problem a non-issue, but if we're not convinced they achieve that, the configs may have some value. 2. It seems like the intent is to use these configs to disable down-conversion specifically. Would it make sense to let the name be more specific (e.g. `log.disable.downconversion`)? Or do you think there are other benefits of this config outside of this use case? 3. Does this config apply to replica fetchers? I think it would be reasonable to disable down-conversion for replica fetchers in all cases since it should not be needed anyway and can cause weird log divergence edge cases. I opened https://issues.apache.org/jira/browse/KAFKA-6392 about this some time ago. Would it be reasonable to include this as part of this KIP? 4. You mention in the KIP that we would use the invalid request error code if the version is disallowed. Wouldn't it make more sense to use unsupported version? Thanks, Jason On Wed, Apr 11, 2018 at 6:38 AM, Rajini Sivaram <rajinisiva...@gmail.com> wrote: > Hi Dhruvil, > > Thanks for the KIP. This is a great improvement to reduce OOMs in brokers > during down-conversion. > > Just a couple of minor questions: > > The goals state: "*Provide appropriate configuration parameters to manage > maximum memory usage during down-conversion on the broker.*" > Which config parameters are these referring to? > > What exactly is a chunk going to be - will it be all the records for a > partition (which could be quite large?) or one message batch? The KIP talks > about pre-allocated fixed size buffers, but your last note suggests that > you would use temporary buffers created for each partition. Do we need to > consider using a memory pool for these or do we think that the buffers will > be small enough to cope with lots of connections with downconversions? This > will be a clear improvement over what we have now in any case, but just > checking anyway. > > Regards, > > Rajini > > On Sat, Apr 7, 2018 at 12:29 AM, Dhruvil Shah <dhru...@confluent.io> > wrote: > > > Hi Ted, > > > > Thanks for the comments. > > > > > > > > *>> bq. we can perform down-conversion when Records.writeTo is called.>> > > Wouldn't this delay the network thread (though maybe the duration is > > short)>> ?* > > Yes, this is noted in the Cons section. I think we have a precedent for > > this in the `SSLTransportLayer` implementation, so trying to follow a > > similar model here. > > > > > > *>> Can you expand on the structure of LazyDownConvertedRecords in more > > detail ?* > > I added the basic structure to the KIP. > > > > > > > > > > *>> bq. even if it exceeds fetch.max.bytes>> I did a brief search but > > didn't see the above config. Did you mean>> message.max.bytes>> ?* > > Yes, thanks for the correction. > > > > > > *>> After the buffers grow, is there a way to trim them down if > > subsequent>> down-conversion doesn't need that much memory ?* > > The easiest way probably is to allocate and use a new buffer for each > > topic-partition. I think we would not require any trimming down if we do > > this. The buffer will be available for garbage collection as soon as we > are > > done serializing and writing all messages to the socket for the > particular > > topic-partition. > > > > Thanks, > > Dhruvil > > > > > > On Fri, Apr 6, 2018 at 3:23 PM, Ted Yu <yuzhih...@gmail.com> wrote: > > > > > bq. we can perform down-conversion when Records.writeTo is called. > > > > > > Wouldn't this delay the network thread (though maybe the duration is > > short) > > > ? > > > > > > Can you expand on the structure of LazyDownConvertedRecords in more > > detail > > > ? > > > > > > bq. even if it exceeds fetch.max.bytes > > > > > > I did a brief search but didn't see the above config. Did you mean > > > message.max.bytes > > > ? > > > > > > bq. with possibility to grow if the allocation > > > > > > After the buffers grow, is there a way to trim them down if subsequent > > > down-conversion doesn't need that much memory ? > > > > > > Thanks > > > > > > > > > On Fri, Apr 6, 2018 at 2:56 PM, Dhruvil Shah <dhru...@confluent.io> > > wrote: > > > > > > > Hi, > > > > > > > > I created a KIP to help mitigate out of memory issues during > > > > down-conversion. The KIP proposes introducing a configuration that > can > > > > prevent down-conversions altogether, and also describes a design for > > > > efficient memory usage for down-conversion. > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > 283%3A+Efficient+Memory+Usage+for+Down-Conversion > > > > > > > > Suggestions and feedback are welcome! > > > > > > > > Thanks, > > > > Dhruvil > > > > > > > > > >