[
https://issues.apache.org/jira/browse/FLINK-6613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16021747#comment-16021747
]
Robert Metzger commented on FLINK-6613:
---------------------------------------
There are some options in the Kafka consumer (that you can just pass using the
configuration properties of Flink's consumer) that influence the fetch behavior.
For example {{max.poll.records}} limits the number of records per poll,
{{fetch.max.bytes}} the bytes per poll. I would try to play around with these
settings to see if it solves the problem.
Flink itself won't buffer any records directly on the heap. The only thing that
buffers in Flink is the network stack, but that is strictly bounded by a
configuration parameter.
> OOM during reading big messages from Kafka
> ------------------------------------------
>
> Key: FLINK-6613
> URL: https://issues.apache.org/jira/browse/FLINK-6613
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Affects Versions: 1.2.0
> Reporter: Andrey
>
> Steps to reproduce:
> 1) Setup Task manager with 2G heap size
> 2) Setup job that reads messages from Kafka 10 (i.e. FlinkKafkaConsumer010)
> 3) Send 3300 messages each 635Kb. So total size is ~2G
> 4) OOM in task manager.
> According to heap dump:
> 1) KafkaConsumerThread read messages with total size ~1G.
> 2) Pass them to the next operator using
> org.apache.flink.streaming.connectors.kafka.internal.Handover
> 3) Then began to read another batch of messages.
> 4) Task manager was able to read next batch of ~500Mb messages until OOM.
> Expected:
> 1) Either have constraint like "number of messages in-flight" OR
> 2) Read next batch of messages only when previous batch processed OR
> 3) Any other option which will solve OOM.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)