Thanks Dhruvil. That makes sense. On Thu, Apr 12, 2018 at 2:49 AM, Dhruvil Shah <dhru...@confluent.io> wrote:
> Hi Rajini, > > Thanks for the comments. > > Which config parameters are these referring to? > > This refers to a proposal that was later rejected. I have removed this goal > from the KIP as it is no longer valid. > > What exactly is a chunk going to be > > I have updated the KIP to remove references to the fixed buffers. I started > out thinking we would have fix-sized buffers with a memory pool but it > seems we could achieve what we want without the added complexity. > > My current proposal is that we read a set of message batches into memory > till we reach a pre-configured size threshold, say 16kB (the pre-configured > size might need to be larger if the first message batch is larger than > that). We down-convert these batches and hold them in a temporary buffer > till are able to send out all the messages. And then repeat the same > process for subsequent batches. > > Because we down-convert a maximum of 16kB worth of messages at any given > point in time, the memory utilization should be much more deterministic, > i.e. we have a maximum of about 16kB memory allocated for each in-flight > `FetchResponse` that requires down-conversion. > > Let me know if this makes sense. > > Thanks, > Dhruvil > > On Wed, Apr 11, 2018 at 6:38 AM, Rajini Sivaram <rajinisiva...@gmail.com> > wrote: > > > Hi Dhruvil, > > > > Thanks for the KIP. This is a great improvement to reduce OOMs in brokers > > during down-conversion. > > > > Just a couple of minor questions: > > > > The goals state: "*Provide appropriate configuration parameters to manage > > maximum memory usage during down-conversion on the broker.*" > > Which config parameters are these referring to? > > > > What exactly is a chunk going to be - will it be all the records for a > > partition (which could be quite large?) or one message batch? The KIP > talks > > about pre-allocated fixed size buffers, but your last note suggests that > > you would use temporary buffers created for each partition. Do we need to > > consider using a memory pool for these or do we think that the buffers > will > > be small enough to cope with lots of connections with downconversions? > This > > will be a clear improvement over what we have now in any case, but just > > checking anyway. > > > > Regards, > > > > Rajini > > > > On Sat, Apr 7, 2018 at 12:29 AM, Dhruvil Shah <dhru...@confluent.io> > > wrote: > > > > > Hi Ted, > > > > > > Thanks for the comments. > > > > > > > > > > > > *>> bq. we can perform down-conversion when Records.writeTo is > called.>> > > > Wouldn't this delay the network thread (though maybe the duration is > > > short)>> ?* > > > Yes, this is noted in the Cons section. I think we have a precedent for > > > this in the `SSLTransportLayer` implementation, so trying to follow a > > > similar model here. > > > > > > > > > *>> Can you expand on the structure of LazyDownConvertedRecords in more > > > detail ?* > > > I added the basic structure to the KIP. > > > > > > > > > > > > > > > *>> bq. even if it exceeds fetch.max.bytes>> I did a brief search but > > > didn't see the above config. Did you mean>> message.max.bytes>> ?* > > > Yes, thanks for the correction. > > > > > > > > > *>> After the buffers grow, is there a way to trim them down if > > > subsequent>> down-conversion doesn't need that much memory ?* > > > The easiest way probably is to allocate and use a new buffer for each > > > topic-partition. I think we would not require any trimming down if we > do > > > this. The buffer will be available for garbage collection as soon as we > > are > > > done serializing and writing all messages to the socket for the > > particular > > > topic-partition. > > > > > > Thanks, > > > Dhruvil > > > > > > > > > On Fri, Apr 6, 2018 at 3:23 PM, Ted Yu <yuzhih...@gmail.com> wrote: > > > > > > > bq. we can perform down-conversion when Records.writeTo is called. > > > > > > > > Wouldn't this delay the network thread (though maybe the duration is > > > short) > > > > ? > > > > > > > > Can you expand on the structure of LazyDownConvertedRecords in more > > > detail > > > > ? > > > > > > > > bq. even if it exceeds fetch.max.bytes > > > > > > > > I did a brief search but didn't see the above config. Did you mean > > > > message.max.bytes > > > > ? > > > > > > > > bq. with possibility to grow if the allocation > > > > > > > > After the buffers grow, is there a way to trim them down if > subsequent > > > > down-conversion doesn't need that much memory ? > > > > > > > > Thanks > > > > > > > > > > > > On Fri, Apr 6, 2018 at 2:56 PM, Dhruvil Shah <dhru...@confluent.io> > > > wrote: > > > > > > > > > Hi, > > > > > > > > > > I created a KIP to help mitigate out of memory issues during > > > > > down-conversion. The KIP proposes introducing a configuration that > > can > > > > > prevent down-conversions altogether, and also describes a design > for > > > > > efficient memory usage for down-conversion. > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > > 283%3A+Efficient+Memory+Usage+for+Down-Conversion > > > > > > > > > > Suggestions and feedback are welcome! > > > > > > > > > > Thanks, > > > > > Dhruvil > > > > > > > > > > > > > > >