[ https://issues.apache.org/jira/browse/FLINK-6301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16072843#comment-16072843 ]
ASF GitHub Bot commented on FLINK-6301: --------------------------------------- Github user vidhu5269 commented on the issue: https://github.com/apache/flink/pull/4015 Hi @tzulitai Apologies for such a long delay. It took me quite a while to come back to this. I ran the updated connector on the cluster and didn't see any dependency conflicts. The job using it was reading from a gzipped avro topic and producing into two different topics: a text and an avro topic. Both the consumption and production worked as expected. It was done on a standalone cluster with 2 workers and 1 master with 8 slots/worker. The job was using 10 slots and was running on the both the workers. Each worker had 8 vCPUs and 8 GB of RAM. With this job, I also verified that there wasn't any memory leak with the `kafka-clients` version change. Following is the diff between the master and version change commit from `mvn dependency:tree`: ``` 20c20 < [INFO] +- org.apache.kafka:kafka-clients:jar:0.10.0.1:compile --- > [INFO] +- org.apache.kafka:kafka-clients:jar:0.10.2.0:compile 27,28c27,28 < [INFO] +- org.apache.kafka:kafka_2.11:jar:0.10.0.1:test < [INFO] | +- com.101tec:zkclient:jar:0.8:test --- > [INFO] +- org.apache.kafka:kafka_2.11:jar:0.10.2.0:test > [INFO] | +- net.sf.jopt-simple:jopt-simple:jar:5.0.3:test 31,35c31,33 < [INFO] | +- org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.4:test < [INFO] | +- net.sf.jopt-simple:jopt-simple:jar:4.9:test < [INFO] | \- org.apache.zookeeper:zookeeper:jar:3.4.6:provided < [INFO] | +- jline:jline:jar:0.9.94:provided < [INFO] | \- io.netty:netty:jar:3.7.0.Final:provided --- > [INFO] | +- com.101tec:zkclient:jar:0.10:test > [INFO] | +- org.apache.zookeeper:zookeeper:jar:3.4.6:provided > [INFO] | \- org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.4:test 70a69 > [INFO] | | +- io.netty:netty:jar:3.8.0.Final:provided ``` From what we can see here, apart from the new kafka-clients library, there are a few changes coming from its tests-jar as well. 1. The tests-jar has new version of `net.sf.jopt-simple:jopt-simple` and `com.101tec:zkclient:jar:0.10` in `test` scope. 2. `jline:jline` is not there in the new version. 3. `3.8.0-Final` of `io.netty:netty` is being pulled in (from `flakka-remote`) instead of `3.7.0-Final`. Since the job and the unit tests worked as expected, I am assuming that these dependency changes are not breaking anything. Do tell me if I am missing something here. Although, these are not new dependencies but I still verified that `zkclient` and `netty` both have ASL 2.0 whereas, jopt-simple has `MIT` which is also compatible with ASL 2.0. So, we should be good as far as licensing goes. > Flink KafkaConnector09 leaks memory on reading compressed messages due to a > Kafka consumer bug > ---------------------------------------------------------------------------------------------- > > Key: FLINK-6301 > URL: https://issues.apache.org/jira/browse/FLINK-6301 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.2.0, 1.1.3, 1.1.4 > Reporter: Rahul Yadav > Assignee: Rahul Yadav > Fix For: 1.2.2, 1.4.0 > > Attachments: jeprof.24611.1228.i1228.heap.svg, > jeprof.24611.1695.i1695.heap.svg, jeprof.24611.265.i265.heap.svg, > jeprof.24611.3138.i3138.heap.svg, jeprof.24611.595.i595.heap.svg, > jeprof.24611.705.i705.heap.svg, jeprof.24611.81.i81.heap.svg, > POSTFIX.jeprof.14880.1944.i1944.heap.svg, > POSTFIX.jeprof.14880.4129.i4129.heap.svg, > POSTFIX.jeprof.14880.961.i961.heap.svg, POSTFIX.jeprof.14880.99.i99.heap.svg, > POSTFIX.jeprof.14880.9.i9.heap.svg > > > Hi > We are running Flink on a standalone cluster with 8 TaskManagers having 8 > vCPUs and 8 slots each. Each host has 16 GB of RAM. > In our jobs, > # We are consuming gzip compressed messages from Kafka using > *KafkaConnector09* and use *rocksDB* backend for checkpoint storage. > # To debug the leak, we used *jemalloc and jprof* to profile the sources of > malloc calls from the java process and attached are the profiles generated at > various stages of the job. As we can see, apart from the os.malloc and > rocksDB.allocateNewBlock, there are additional malloc calls coming from > inflate() method of java.util.zip.inflater. These calls are innocuous as long > as the inflater.end() method is called after it's use. > # To look for sources of inflate() method, we used Btrace on the running > process to dump caller stack on the method call. Following is the stackTrace > we got: > {code} > java.util.zip.Inflater.inflate(Inflater.java) > java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152) > java.util.zip.GZIPInputStream.read(GZIPInputStream.java:117) > java.io.DataInputStream.readFully(DataInputStream.java:195) > org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:253) > org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:210) > org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79) > org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45) > org.apache.kafka.common.record.MemoryRecords$RecordsIterator.innerDone(MemoryRecords.java:282) > org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:233) > org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:210) > org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79) > org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45) > org.apache.kafka.clients.consumer.internals.Fetcher.handleFetchResponse(Fetcher.java:563) > org.apache.kafka.clients.consumer.internals.Fetcher.access$000(Fetcher.java:69) > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:139) > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:136) > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380) > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274) > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908) > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) > org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:227) > {code} > The end() method on Inflater is called inside the close() method of > *InflaterInputSteam* (extended by *GZIPInputStream*) but looking through the > Kafka consumer code, we found that RecordsIterator is not closing the > compressor stream after use and hence, causing the memory leak: > https://github.com/apache/kafka/blob/23c69d62a0cabf06c4db8b338f0ca824dc6d81a7/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java#L210 > https://issues.apache.org/jira/browse/KAFKA-3937 was filed for this and the > issue was fixed in 0.10.1.0 but not back-ported to previous versions. > So, I would assume that we have to two paths from here: > 1. Wait for the changes to be back-ported to 0.9.x Kafka consumer and then, > update the Kafka-clients dependency: > https://github.com/apache/flink/blob/release-1.2/flink-connectors/flink-connector-kafka-0.9/pom.xml#L40 > 2. Update the kafka-connector10 to use 0.10.1.0 clients library instead of > 0.10.0.1. > https://github.com/apache/flink/blob/release-1.2/flink-connectors/flink-connector-kafka-0.10/pom.xml#L40 > Apart from the master, also back-port the changes to 1.2.x for Kafka > connector 10 and all the 1.x dependencies for Kafka connector 09. -- This message was sent by Atlassian JIRA (v6.4.14#64029)