[ 
https://issues.apache.org/jira/browse/FLINK-33231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-33231:
-------------------------------------------

    Assignee: Tzu-Li (Gordon) Tai

> Memory leak in KafkaSourceReader if no data in consumed topic
> -------------------------------------------------------------
>
>                 Key: FLINK-33231
>                 URL: https://issues.apache.org/jira/browse/FLINK-33231
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.17.1
>            Reporter: Lauri Suurväli
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: kafka-3.0.1, kafka-3.1.0
>
>         Attachments: Screenshot 2023-10-09 at 13.02.34.png, Screenshot 
> 2023-10-09 at 14.13.43.png, Screenshot 2023-10-10 at 12.49.37.png
>
>
> *Problem description*
> Our Flink streaming job TaskManager heap gets full when the job has nothing 
> to consume and process.
> It's a simple streaming job with KafkaSource -> ProcessFunction -> KafkaSink. 
> When there are no messages in the source topic the TaskManager heap usage 
> starts increasing until the job exits after receiving a SIGTERM signal. We 
> are running the job on AWS EMR with YARN.
> The problems with the TaskManager heap usage do not occur when there is data 
> to process. It's also worth noting that sending a single message to the 
> source topic of a streaming job that has been sitting idle and suffers from 
> the memory leak will cause the heap to be cleared. However it does not 
> resolve the problem since the heap usage will start increasing immediately 
> after processing the message.
> !Screenshot 2023-10-10 at 12.49.37.png!
> TaskManager heap used percentage is calculated by 
>  
> {code:java}
> flink.taskmanager.Status.JVM.Memory.Heap.Used * 100 / 
> flink.taskmanager.Status.JVM.Memory.Heap.Max{code}
>  
>  
>  I was able to take heap dumps of the TaskManager processes during a high 
> heap usage percentage. Heap dump analysis detected 912,355 instances of 
> java.util.HashMap empty collections retaining >= 43,793,040 bytes.
> !Screenshot 2023-10-09 at 14.13.43.png!
> The retained heap seemed to be located at:
>  
> {code:java}
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#offsetsToCommit{code}
>  
> !Screenshot 2023-10-09 at 13.02.34.png!
>  
> *Possible hints:*
> An empty HashMap is added during the snapshotState method to offsetsToCommit 
> map if it does not already exist for the given checkpoint. [KafkaSourceReader 
> line 
> 107|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L107]
>  
> {code:java}
> Map<TopicPartition, OffsetAndMetadata> offsetsMap =
>         offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); 
> {code}
>  
> If the startingOffset for the given split is >= 0 then a new entry would be 
> added to the map from the previous step. [KafkaSourceReader line 
> 113|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java#L113]
> {code:java}
> if (split.getStartingOffset() >= 0) {
>     offsetsMap.put(
>         split.getTopicPartition(),
>         new OffsetAndMetadata(split.getStartingOffset()));
> }{code}
> If the starting offset is smaller than 0 then this would leave the offsetMap 
> created in step 1 empty. We can see from the logs that the startingOffset is 
> -3 when the splits are added to the reader.
>  
> {code:java}
> Adding split(s) to reader: [[Partition: source-events-20, StartingOffset: 1, 
> StoppingOffset: -9223372036854775808], [Partition: source-events-44, 
> StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: 
> source-events-12, StartingOffset: -3, StoppingOffset: -9223372036854775808], 
> [Partition: source-events-36, StartingOffset: 1, StoppingOffset: 
> -9223372036854775808], [Partition: source-events-4, StartingOffset: -3, 
> StoppingOffset: -9223372036854775808], [Partition: source-events-28, 
> StartingOffset: -3, StoppingOffset: -9223372036854775808]]{code}
>  
>  
> The offsetsToCommit map is cleaned from entries once they have been committed 
> to Kafka which happens during the callback function that is passed to the 
> KafkaSourceFetcherManager.commitOffsets method in 
> KafkaSourceReader.notifyCheckpointComplete method.
> However if the committedPartitions is empty for the given checkpoint, then 
> the KafkaSourceFetcherManager.commitOffsets method returns.  
> [KafkaSourceFetcherManager line 
> 78|https://github.com/apache/flink-connector-kafka/blob/b09928d5ef290f2a046dc1fe40b4c5cebe76f997/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/fetcher/KafkaSourceFetcherManager.java#L78]
> {code:java}
> if (offsetsToCommit.isEmpty()) {
>     return;
> } {code}
> We can observe from the logs that indeed an empty map is encountered at this 
> step:
> {code:java}
> Committing offsets {}{code}
> *Conclusion*
> It seems that an empty map gets added per each checkpoint to offsetsToCommit 
> map. Since the startingOffset in our case is -3 then the empty map never gets 
> filled. During the offset commit phase the offsets for these checkpoints are 
> ignored, since there is nothing to commit, however there isn't any cleanup 
> either so the empty maps keep accumulating. 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to