[ 
https://issues.apache.org/jira/browse/FLINK-2004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14553987#comment-14553987
 ] 

ASF GitHub Bot commented on FLINK-2004:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/674#discussion_r30787124
  
    --- Diff: 
flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
 ---
    @@ -236,13 +246,22 @@ public void restoreState(long[] state) {
        @Override
        public void commitCheckpoint(long checkpointId) {
                LOG.info("Commit checkpoint {}", checkpointId);
    -           long[] checkpointOffsets = 
pendingCheckpoints.remove(checkpointId);
    -           if(checkpointOffsets == null) {
    +           final int posInMap = pendingCheckpoints.indexOf(checkpointId);
    +           if(posInMap == -1) {
                        LOG.warn("Unable to find pending checkpoint for id {}", 
checkpointId);
                        return;
                }
    +
    +           long[] checkpointOffsets = (long[]) 
pendingCheckpoints.remove(posInMap);
                LOG.info("Got corresponding offsets {}", 
Arrays.toString(checkpointOffsets));
    --- End diff --
    
    I think this log message can be improved. To something like "Committing 
offsets {} to Kafka."


> Memory leak in presence of failed checkpoints in KafkaSource
> ------------------------------------------------------------
>
>                 Key: FLINK-2004
>                 URL: https://issues.apache.org/jira/browse/FLINK-2004
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 0.9
>            Reporter: Stephan Ewen
>            Assignee: Robert Metzger
>            Priority: Critical
>             Fix For: 0.9
>
>
> Checkpoints that fail never send a commit message to the tasks.
> Maintaining a map of all pending checkpoints introduces a memory leak, as 
> entries for failed checkpoints will never be removed.
> Approaches to fix this:
>   - The source cleans up entries from older checkpoints once a checkpoint is 
> committed (simple implementation in a linked hash map)
>   - The commit message could include the optional state handle (source needs 
> not maintain the map)
>   - The checkpoint coordinator could send messages for failed checkpoints?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to