[ 
https://issues.apache.org/jira/browse/FLINK-6301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16072843#comment-16072843
 ] 

ASF GitHub Bot commented on FLINK-6301:
---------------------------------------

Github user vidhu5269 commented on the issue:

    https://github.com/apache/flink/pull/4015
  
    Hi @tzulitai 
    
    Apologies for such a long delay. It took me quite a while to come back to 
this.
    
    I ran the updated connector on the cluster and didn't see any dependency 
conflicts. The job using it was reading from a gzipped avro topic and producing 
into two different topics: a text and an avro topic. Both the consumption and 
production worked as expected.
    
    It was done on a standalone cluster with 2 workers and 1 master with 8 
slots/worker. The job was using 10 slots and was running on the both the 
workers. Each worker had 8 vCPUs and 8 GB of RAM. 
    
    With this job, I also verified that there wasn't any memory leak with the 
`kafka-clients` version change.
    
    Following is the diff between the master and version change commit from 
`mvn dependency:tree`:
    ```
    20c20
    < [INFO] +- org.apache.kafka:kafka-clients:jar:0.10.0.1:compile
    ---
    > [INFO] +- org.apache.kafka:kafka-clients:jar:0.10.2.0:compile
    27,28c27,28
    < [INFO] +- org.apache.kafka:kafka_2.11:jar:0.10.0.1:test
    < [INFO] |  +- com.101tec:zkclient:jar:0.8:test
    ---
    > [INFO] +- org.apache.kafka:kafka_2.11:jar:0.10.2.0:test
    > [INFO] |  +- net.sf.jopt-simple:jopt-simple:jar:5.0.3:test
    31,35c31,33
    < [INFO] |  +- 
org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.4:test
    < [INFO] |  +- net.sf.jopt-simple:jopt-simple:jar:4.9:test
    < [INFO] |  \- org.apache.zookeeper:zookeeper:jar:3.4.6:provided
    < [INFO] |     +- jline:jline:jar:0.9.94:provided
    < [INFO] |     \- io.netty:netty:jar:3.7.0.Final:provided
    ---
    > [INFO] |  +- com.101tec:zkclient:jar:0.10:test
    > [INFO] |  +- org.apache.zookeeper:zookeeper:jar:3.4.6:provided
    > [INFO] |  \- 
org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.4:test
    70a69
    > [INFO] |  |  +- io.netty:netty:jar:3.8.0.Final:provided
    ```
    From what we can see here, apart from the new kafka-clients library, there 
are a few changes coming from its tests-jar as well.
    1. The tests-jar has new version of `net.sf.jopt-simple:jopt-simple` and 
`com.101tec:zkclient:jar:0.10` in `test` scope. 
    2. `jline:jline` is not there in the new version.
    3. `3.8.0-Final` of `io.netty:netty` is being pulled in (from 
`flakka-remote`) instead of `3.7.0-Final`. 
    
    Since the job and the unit tests worked as expected, I am assuming that 
these dependency changes are not breaking anything. Do tell me if I am missing 
something here.
    
    Although, these are not new dependencies but I still verified that 
`zkclient` and `netty` both have ASL 2.0 whereas, jopt-simple has `MIT` which 
is also compatible with ASL 2.0. So, we should be good as far as licensing goes.


> Flink KafkaConnector09 leaks memory on reading compressed messages due to a 
> Kafka consumer bug
> ----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-6301
>                 URL: https://issues.apache.org/jira/browse/FLINK-6301
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.2.0, 1.1.3, 1.1.4
>            Reporter: Rahul Yadav
>            Assignee: Rahul Yadav
>             Fix For: 1.2.2, 1.4.0
>
>         Attachments: jeprof.24611.1228.i1228.heap.svg, 
> jeprof.24611.1695.i1695.heap.svg, jeprof.24611.265.i265.heap.svg, 
> jeprof.24611.3138.i3138.heap.svg, jeprof.24611.595.i595.heap.svg, 
> jeprof.24611.705.i705.heap.svg, jeprof.24611.81.i81.heap.svg, 
> POSTFIX.jeprof.14880.1944.i1944.heap.svg, 
> POSTFIX.jeprof.14880.4129.i4129.heap.svg, 
> POSTFIX.jeprof.14880.961.i961.heap.svg, POSTFIX.jeprof.14880.99.i99.heap.svg, 
> POSTFIX.jeprof.14880.9.i9.heap.svg
>
>
> Hi
> We are running Flink on a standalone cluster with 8 TaskManagers having 8 
> vCPUs and 8 slots each. Each host has 16 GB of RAM.
> In our jobs, 
> # We are consuming gzip compressed messages from Kafka using 
> *KafkaConnector09* and use *rocksDB* backend for checkpoint storage.
> # To debug the leak, we used *jemalloc and jprof* to profile the sources of 
> malloc calls from the java process and attached are the profiles generated at 
> various stages of the job. As we can see, apart from the os.malloc and 
> rocksDB.allocateNewBlock, there are additional malloc calls coming from 
> inflate() method of java.util.zip.inflater. These calls are innocuous as long 
> as the inflater.end() method is called after it's use.
> # To look for sources of inflate() method, we used Btrace on the running 
> process to dump caller stack on the method call. Following is the stackTrace 
> we got: 
> {code}
> java.util.zip.Inflater.inflate(Inflater.java)
> java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152)
> java.util.zip.GZIPInputStream.read(GZIPInputStream.java:117)
> java.io.DataInputStream.readFully(DataInputStream.java:195)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:253)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:210)
> org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
> org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.innerDone(MemoryRecords.java:282)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:233)
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(MemoryRecords.java:210)
> org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
> org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
> org.apache.kafka.clients.consumer.internals.Fetcher.handleFetchResponse(Fetcher.java:563)
> org.apache.kafka.clients.consumer.internals.Fetcher.access$000(Fetcher.java:69)
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:139)
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:136)
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380)
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274)
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908)
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
> org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:227)
> {code}
> The end() method on Inflater is called inside the close() method of 
> *InflaterInputSteam* (extended by *GZIPInputStream*) but looking through the 
> Kafka consumer code, we found that RecordsIterator is not closing the 
> compressor stream after use and hence, causing the memory leak:
> https://github.com/apache/kafka/blob/23c69d62a0cabf06c4db8b338f0ca824dc6d81a7/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java#L210
> https://issues.apache.org/jira/browse/KAFKA-3937 was filed for this and the 
> issue was fixed in 0.10.1.0 but not back-ported to previous versions.
> So, I would assume that we have to two paths from here: 
> 1. Wait for the changes to be back-ported to 0.9.x Kafka consumer and then, 
> update the Kafka-clients dependency:
> https://github.com/apache/flink/blob/release-1.2/flink-connectors/flink-connector-kafka-0.9/pom.xml#L40
> 2. Update the kafka-connector10 to use 0.10.1.0 clients library instead of 
> 0.10.0.1.
> https://github.com/apache/flink/blob/release-1.2/flink-connectors/flink-connector-kafka-0.10/pom.xml#L40
> Apart from the master, also back-port the changes to 1.2.x for Kafka 
> connector 10 and all the 1.x dependencies for Kafka connector 09.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to