Ismael: I'd be interested in submitting a PR for #1 & #2, but I think the fix for #4 would be better suited to someone with a fuller understanding of Kafka. I may also need a bit of help with how to structure the unit tests, if any are needed.
Do you have any recommendations for what unit tests you would want around this change? - Kyle -----Original Message----- From: isma...@gmail.com [mailto:isma...@gmail.com] On Behalf Of Ismael Juma Sent: Wednesday, January 31, 2018 6:35 PM To: dev <dev@kafka.apache.org> Subject: Re: Excessive Memory Usage with Compression enabled and possible resolutions Hi Kyle, Are you interested in submitting a pull request? Ismael On Wed, Jan 31, 2018 at 3:00 PM, Kyle Tinker <ktin...@workforcesoftware.com> wrote: > Ismael, > > I have filed > https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissue > s.apache.org%2Fjira%2Fbrowse%2FKAFKA-6512&data=02%7C01%7C%7C84151d600d > c049e1daf208d56903565a%7Cc61157e903cb47589165ee7845cb0ca3%7C0%7C0%7C636530385420262643&sdata=O3clCfR0zikIeeGO5e0m%2FDG5yeB7mG3jPWzqpMi37Ac%3D&reserved=0 > for this issue. I could not find a target version field. Let me know if > you need any additional information. I'm new to this project so hopefully > the format is what you were looking for. > > - Kyle > > -----Original Message----- > From: Ismael Juma [mailto:isma...@gmail.com] > Sent: Tuesday, January 30, 2018 9:01 PM > To: dev <dev@kafka.apache.org> > Subject: Re: Excessive Memory Usage with Compression enabled and > possible resolutions > > Thanks for the report. I haven't looked at the code, but it seems like > we would want to do both 1 and 2. Can you please file a JIRA with > 1.1.0 as the target version? > > Ismael > > On 30 Jan 2018 5:46 pm, "Kyle Tinker" <ktin...@workforcesoftware.com> > wrote: > > > I'm using Kafka 1.0.0 and the Java producer. > > > > I've noticed high memory usage in the producer when enabling > > compression (gzip or lz4). I don't observe the behavior with > > compression off, but with it on I'll run out of heap (2GB). Using a > Java profiler, I see the data is > > in the KafkaLZ4BlockOutputStream (or related class for gzip). I see > that > > MemoryRecordsBuilder:closeForRecordAppends() is trying to deal with > > this, but is not successful. I'm most likely network bottlenecked, > > so I expect the producer buffers to be full while the job is running > > and potentially a lot of unacknowledged records. > > > > I've tried using the default buffer.memory with 20 producers (across > > 20 > > threads) and sending data as quickly as I can. I've also tried 1MB > > of buffer.memory, which seemed to reduce memory consumption but I > > could still run OOM in certain cases. I have > > max.in.flight.requests.per.connection > > set to 1. In short, I should only have ~20 MB (20* 1MB) of data in > > buffers, but I can easily exhaust 2000 MB used by Kafka. > > > > In looking at the code more, it looks like the > > KafkaLZ4BlockOutputStream doesn't clear the compressedBuffer or > > buffer when close() is called. In my heap dump, both of those are > > ~65k size each, meaning that each batch is taking up ~148k of space, > > of which 131k > is buffers. > > (buffer.memory=1,000,000 and messages are 1k each until the batch fills). > > > > Kafka tries to manage memory usage by calling > > MemoryRecordsBuilder:closeForRecordAppends(), > > which as documented as "Release resources required for record > > appends > (e.g. > > compression buffers)". However, this method doesn't actually clear > > those buffers because KafkaLZ4BlockOutputStream.close() only writes > > the block and end mark and closes the output stream. It doesn't > > actually clear the buffer and compressedBuffer in > > KafkaLZ4BlockOutputStream. Those stay allocated in RAM until the > > block is acknowledged by the broker, processed in > > Sender:handleProduceResponse(), and the batch is deallocated. This > > memory usage therefore increases, possibly without bound. In my > > test program, the program died with approximately 345 unprocessed > > batches per > producer (20 producers), despite having max.in.flight.requests.per. > > connection=1. > > > > There are a few possible optimizations I can think of: > > 1) We could declare KafkaLZ4BlockOutputStream.buffer and > > compressedBuffer as non-final and null them in the close() method > > 2) We could declare the MemoryRecordsBuilder.appendStream non-final > > and null it in the closeForRecordAppends() method > > 3) We could have the ProducerBatch discard the recordsBuilder in > > closeForRecordAppends(), however, this is likely a bad idea because > > the recordsBuilder contains significant metadata that is likely > > needed after the stream is closed. It is also final. > > 4) We could try to limit the number of non-acknowledged batches in > > flight. This would bound the maximum memory usage but may > > negatively impact performance. > > > > Fix #1 would only improve the LZ4 algorithm, and not any other > algorithms. > > Fix #2 would improve all algorithms, compression and otherwise. Of > > the 3 proposed here, it seems the best. This would also involve > > having to check appendStreamIsClosed in every usage of appendStream > > within MemoryRecordsBuilder to avoid NPE's. > > > > Are there any thoughts or suggestions on how to proceed? > > > > If requested I can provide standalone testcase code demonstrating > > this problem. > > > > Thanks, > > -Kyle > > > > > > > > > > > > > > This message is intended exclusively for the individual or entity to > > which it is addressed. This communication may contain information > > that is proprietary, privileged, confidential or otherwise legally > > exempt from disclosure. If you are not the named addressee, or have > > been inadvertently and erroneously referenced in the address line, > > you are not authorized to read, print, retain, copy or disseminate > > this message or any part of it. If you have received this message in > > error, please notify the sender immediately by e-mail and delete all > > copies of the message. (ID m031214) > > >