[
https://issues.apache.org/jira/browse/KAFKA-6512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rajini Sivaram resolved KAFKA-6512.
-----------------------------------
Resolution: Fixed
Fix Version/s: (was: 1.2.0)
1.1.0
Implemented options 1) and 2) from the description.
> Java Producer: Excessive memory usage with compression enabled
> --------------------------------------------------------------
>
> Key: KAFKA-6512
> URL: https://issues.apache.org/jira/browse/KAFKA-6512
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Affects Versions: 1.0.0
> Environment: Windows 10
> Reporter: Kyle Tinker
> Assignee: Rajini Sivaram
> Priority: Major
> Fix For: 1.1.0
>
> Attachments: KafkaSender.java
>
>
> h2. User Story
> As a user of the Java producer, I want a predictable memory usage for the
> Kafka client so that I can ensure that my system is sized appropriately and
> will be stable even under heavy usage.
> As a user of the Java producer, I want a smaller memory footprint so that my
> systems don't consume as many resources.
> h2. Acceptance Criteria
> * Enabling Compression in Kafka should not significantly increase the memory
> usage of Kafka
> * The memory usage of Kafka's Java Producer should be roughly in line with
> the buffer size (buffer.memory) and the number of producers declared.
> h2. Additional Information
> I've observed high memory usage in the producer when enabling compression
> (gzip or lz4). I don't observe the behavior with compression off, but with
> it on I'll run out of heap (2GB). Using a Java profiler, I see the data is
> in the KafkaLZ4BlockOutputStream (or related class for gzip). I see that
> MemoryRecordsBuilder:closeForRecordAppends() is trying to deal with this, but
> is not successful. I'm most likely network bottlenecked, so I expect the
> producer buffers to be full while the job is running and potentially a lot of
> unacknowledged records.
> I've tried using the default buffer.memory with 20 producers (across 20
> threads) and sending data as quickly as I can. I've also tried 1MB of
> buffer.memory, which seemed to reduce memory consumption but I could still
> run OOM in certain cases. I have max.in.flight.requests.per.connection set
> to 1. In short, I should only have ~20 MB (20* 1MB) of data in buffers, but
> I can easily exhaust 2000 MB used by Kafka.
> In looking at the code more, it looks like the KafkaLZ4BlockOutputStream
> doesn't clear the compressedBuffer or buffer when close() is called. In my
> heap dump, both of those are ~65k size each, meaning that each batch is
> taking up ~148k of space, of which 131k is buffers. (buffer.memory=1,000,000
> and messages are 1k each until the batch fills).
> Kafka tries to manage memory usage by calling
> MemoryRecordsBuilder:closeForRecordAppends(), which as documented as "Release
> resources required for record appends (e.g. compression buffers)". However,
> this method doesn't actually clear those buffers because
> KafkaLZ4BlockOutputStream.close() only writes the block and end mark and
> closes the output stream. It doesn't actually clear the buffer and
> compressedBuffer in KafkaLZ4BlockOutputStream. Those stay allocated in RAM
> until the block is acknowledged by the broker, processed in
> Sender:handleProduceResponse(), and the batch is deallocated. This memory
> usage therefore increases, possibly without bound. In my test program, the
> program died with approximately 345 unprocessed batches per producer (20
> producers), despite having max.in.flight.requests.per.connection=1.
> h2. Steps to Reproduce
> # Create a topic test with plenty of storage
> # Use a connection with a very fast upload pipe and limited download. This
> allows the outbound data to go out, but acknowledgements to be delayed
> flowing in.
> # Download KafkaSender.java (attached to this ticket)
> # Set line 17 to reference your Kafka broker
> # Run the program with a 1GB Xmx value
> h2. Possible solutions
> There are a few possible optimizations I can think of:
> # We could declare KafkaLZ4BlockOutputStream.buffer and compressedBuffer as
> non-final and null them in the close() method
> # We could declare the MemoryRecordsBuilder.appendStream non-final and null
> it in the closeForRecordAppends() method
> # We could have the ProducerBatch discard the recordsBuilder in
> closeForRecordAppends(), however, this is likely a bad idea because the
> recordsBuilder contains significant metadata that is likely needed after the
> stream is closed. It is also final.
> # We could try to limit the number of non-acknowledged batches in flight.
> This would bound the maximum memory usage but may negatively impact
> performance.
>
> Fix #1 would only improve the LZ4 algorithm, and not any other algorithms.
> Fix #2 would improve all algorithms, compression and otherwise. Of the 3
> proposed here, it seems the best. This would also involve having to check
> appendStreamIsClosed in every usage of appendStream within
> MemoryRecordsBuilder to avoid NPE's.
> Fix #4 is likely necessary if we want to bound the maximum memory usage of
> Kafka. Removing the buffers in Fix 1 or 2 will reduce the memory usage by
> ~90%, but theoretically there is still no limit.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)