Ismael:

I'd be interested in submitting a PR for #1 & #2, but I think the fix for #4 
would be better suited to someone with a fuller understanding of Kafka.  I may 
also need a bit of help with how to structure the unit tests, if any are needed.

Do you have any recommendations for what unit tests you would want around this 
change?

- Kyle

-----Original Message-----
From: isma...@gmail.com [mailto:isma...@gmail.com] On Behalf Of Ismael Juma
Sent: Wednesday, January 31, 2018 6:35 PM
To: dev <dev@kafka.apache.org>
Subject: Re: Excessive Memory Usage with Compression enabled and possible 
resolutions

Hi Kyle,

Are you interested in submitting a pull request?

Ismael

On Wed, Jan 31, 2018 at 3:00 PM, Kyle Tinker <ktin...@workforcesoftware.com>
wrote:

> Ismael,
>
> I have filed 
> https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissue
> s.apache.org%2Fjira%2Fbrowse%2FKAFKA-6512&data=02%7C01%7C%7C84151d600d
> c049e1daf208d56903565a%7Cc61157e903cb47589165ee7845cb0ca3%7C0%7C0%7C636530385420262643&sdata=O3clCfR0zikIeeGO5e0m%2FDG5yeB7mG3jPWzqpMi37Ac%3D&reserved=0
>  for this issue.  I could not find a target version field.  Let me know if 
> you need any additional information.  I'm new to this project so hopefully 
> the format is what you were looking for.
>
> - Kyle
>
> -----Original Message-----
> From: Ismael Juma [mailto:isma...@gmail.com]
> Sent: Tuesday, January 30, 2018 9:01 PM
> To: dev <dev@kafka.apache.org>
> Subject: Re: Excessive Memory Usage with Compression enabled and 
> possible resolutions
>
> Thanks for the report. I haven't looked at the code, but it seems like 
> we would want to do both 1 and 2. Can you please file a JIRA with 
> 1.1.0 as the target version?
>
> Ismael
>
> On 30 Jan 2018 5:46 pm, "Kyle Tinker" <ktin...@workforcesoftware.com>
> wrote:
>
> > I'm using Kafka 1.0.0 and the Java producer.
> >
> > I've noticed high memory usage in the producer when enabling 
> > compression (gzip or lz4).  I don't observe the behavior with 
> > compression off, but with it on I'll run out of heap (2GB).  Using a
> Java profiler, I see the data is
> > in the KafkaLZ4BlockOutputStream (or related class for gzip).   I see
> that
> > MemoryRecordsBuilder:closeForRecordAppends() is trying to deal with 
> > this, but is not successful.  I'm most likely network bottlenecked, 
> > so I expect the producer buffers to be full while the job is running 
> > and potentially a lot of unacknowledged records.
> >
> > I've tried using the default buffer.memory with 20 producers (across
> > 20
> > threads) and sending data as quickly as I can.  I've also tried 1MB 
> > of buffer.memory, which seemed to reduce memory consumption but I 
> > could still run OOM in certain cases.  I have 
> > max.in.flight.requests.per.connection
> > set to 1.  In short, I should only have ~20 MB (20* 1MB) of data in 
> > buffers, but I can easily exhaust 2000 MB used by Kafka.
> >
> > In looking at the code more, it looks like the 
> > KafkaLZ4BlockOutputStream doesn't clear the compressedBuffer or 
> > buffer when close() is called.  In my heap dump, both of those are 
> > ~65k size each, meaning that each batch is taking up ~148k of space, 
> > of which 131k
> is buffers.
> > (buffer.memory=1,000,000 and messages are 1k each until the batch fills).
> >
> > Kafka tries to manage memory usage by calling 
> > MemoryRecordsBuilder:closeForRecordAppends(),
> > which as documented as "Release resources required for record 
> > appends
> (e.g.
> > compression buffers)".  However, this method doesn't actually clear 
> > those buffers because KafkaLZ4BlockOutputStream.close() only writes 
> > the block and end mark and closes the output stream.  It doesn't 
> > actually clear the buffer and compressedBuffer in 
> > KafkaLZ4BlockOutputStream.  Those stay allocated in RAM until the 
> > block is acknowledged by the broker, processed in 
> > Sender:handleProduceResponse(), and the batch is deallocated.  This 
> > memory usage therefore increases, possibly without bound.  In my 
> > test program, the program died with approximately 345 unprocessed 
> > batches per
> producer (20 producers), despite having max.in.flight.requests.per.
> > connection=1.
> >
> > There are a few possible optimizations I can think of:
> > 1) We could declare KafkaLZ4BlockOutputStream.buffer and 
> > compressedBuffer as non-final and null them in the close() method
> > 2) We could declare the MemoryRecordsBuilder.appendStream non-final 
> > and null it in the closeForRecordAppends() method
> > 3) We could have the ProducerBatch discard the recordsBuilder in 
> > closeForRecordAppends(), however, this is likely a bad idea because 
> > the recordsBuilder contains significant metadata that is likely 
> > needed after the stream is closed.  It is also final.
> > 4) We could try to limit the number of non-acknowledged batches in 
> > flight.  This would bound the maximum memory usage but may 
> > negatively impact performance.
> >
> > Fix #1 would only improve the LZ4 algorithm, and not any other
> algorithms.
> > Fix #2 would improve all algorithms, compression and otherwise.  Of 
> > the 3 proposed here, it seems the best.  This would also involve 
> > having to check appendStreamIsClosed in every usage of appendStream 
> > within MemoryRecordsBuilder to avoid NPE's.
> >
> > Are there any thoughts or suggestions on how to proceed?
> >
> > If requested I can provide standalone testcase code demonstrating 
> > this problem.
> >
> > Thanks,
> > -Kyle
> >
> >
> >
> >
> >
> >
> > This message is intended exclusively for the individual or entity to 
> > which it is addressed. This communication may contain information 
> > that is proprietary, privileged, confidential or otherwise legally 
> > exempt from disclosure. If you are not the named addressee, or have 
> > been inadvertently and erroneously referenced in the address line, 
> > you are not authorized to read, print, retain, copy or disseminate 
> > this message or any part of it. If you have received this message in 
> > error, please notify the sender immediately by e-mail and delete all 
> > copies of the message. (ID m031214)
> >
>

Reply via email to