[
https://issues.apache.org/jira/browse/FLINK-6301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15970723#comment-15970723
]
Rahul Yadav edited comment on FLINK-6301 at 4/17/17 6:18 AM:
-------------------------------------------------------------
Thanx for the update and sure, it makes sense!
Few updates:
1. We will have to upgrade the kafka-clients version to `0.10.2.0` instead of
`0.10.1.0` as the latter does not work with old 0.10.0 brokers whereas the
former does. This is important as it does not mandate a Kafka upgrade to fix a
consumer application (Flink) issue.
2. FYI: We upgraded the kafka-clients version by overriding it in our job poms
and as expected, it fixed the issue. PFA the new memory profiles which does not
show dangling calls to the inflate() method.
In our case, we were already running 0.10.0 Kafka brokers in production so
upgrading the KafkaConnector from 09 to 10 wasn't an issue but it may not be
true for other users. Let me discuss this with Kafka community so that the fix
can be back-ported for 09 clients as well.
was (Author: vidhu5269):
Thanx for the update and sure, makes sense!
Few updates:
1. We will have to upgrade the kafka-clients version to `0.10.2.0` instead of
`0.10.1.0` as the latter does not work with old 0.10.0 brokers whereas the
former does. This is important as it does not mandate a Kafka upgrade to fix a
consumer application (Flink) issue.
2. FYI: We upgraded the kafka-clients version by overriding it in our job poms
and as expected, it fixed the issue. PFA the new memory profiles which does not
show dangling calls to the inflate() method.
In our case, we were already running 0.10.0 Kafka brokers in production so
upgrading the KafkaConnector from 09 to 10 wasn't an issue but it may not be
true for other users. Let me discuss this with Kafka community so that the fix
can be back-ported for 09 clients as well.
> Flink KafkaConnector09 leaks memory on reading compressed messages due to a
> Kafka consumer bug
> ----------------------------------------------------------------------------------------------
>
> Key: FLINK-6301
> URL: https://issues.apache.org/jira/browse/FLINK-6301
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Affects Versions: 1.2.0, 1.1.3, 1.1.4
> Reporter: Rahul Yadav
> Attachments: jeprof.24611.1228.i1228.heap.svg,
> jeprof.24611.1695.i1695.heap.svg, jeprof.24611.265.i265.heap.svg,
> jeprof.24611.3138.i3138.heap.svg, jeprof.24611.595.i595.heap.svg,
> jeprof.24611.705.i705.heap.svg, jeprof.24611.81.i81.heap.svg
>
>
> Hi
> We are running Flink on a standalone cluster with 8 TaskManagers having 8
> vCPUs and 8 slots each. Each host has 16 GB of RAM.
> In our jobs,
> # We are consuming gzip compressed messages from Kafka using
> *KafkaConnector09* and use *rocksDB* backend for checkpoint storage.
> # To debug the leak, we used *jemalloc and jprof* to profile the sources of
> malloc calls from the java process and attached are the profiles generated at
> various stages of the job. As we can see, apart from the os.malloc and
> rocksDB.allocateNewBlock, there are additional malloc calls coming from
> inflate() method of java.util.zip.inflater. These calls are innocuous as long
> as the inflater.end() method is called after it's use.
> # To look for sources of inflate() method, we used Btrace on the running
> process to dump caller stack on the method call. Following is the stackTrace
> we got:
> {code}
> java.util.zip.Inflater.inflate(Inflater.java)
> java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152)
> java.util.zip.GZIPInputStream.read(GZIPInputStream.java:117)
> java.io.DataInputStream.readFully(DataInputStream.java:195)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:253)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:210)
> org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
> org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.innerDone(MemoryRecords.java:282)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:233)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:210)
> org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
> org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
> org.apache.kafka.clients.consumer.internals.Fetcher.handleFetchResponse(Fetcher.java:563)
> org.apache.kafka.clients.consumer.internals.Fetcher.access$000(Fetcher.java:69)
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:139)
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:136)
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380)
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274)
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:227)
> {code}
> The end() method on Inflater is called inside the close() method of
> *InflaterInputSteam* (extended by *GZIPInputStream*) but looking through the
> Kafka consumer code, we found that RecordsIterator is not closing the
> compressor stream after use and hence, causing the memory leak:
> https://github.com/apache/kafka/blob/23c69d62a0cabf06c4db8b338f0ca824dc6d81a7/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java#L210
> https://issues.apache.org/jira/browse/KAFKA-3937 was filed for this and the
> issue was fixed in 0.10.1.0 but not back-ported to previous versions.
> So, I would assume that we have to two paths from here:
> 1. Wait for the changes to be back-ported to 0.9.x Kafka consumer and then,
> update the Kafka-clients dependency:
> https://github.com/apache/flink/blob/release-1.2/flink-connectors/flink-connector-kafka-0.9/pom.xml#L40
> 2. Update the kafka-connector10 to use 0.10.1.0 clients library instead of
> 0.10.0.1.
> https://github.com/apache/flink/blob/release-1.2/flink-connectors/flink-connector-kafka-0.10/pom.xml#L40
> Apart from the master, also back-port the changes to 1.2.x for Kafka
> connector 10 and all the 1.x dependencies for Kafka connector 09.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)