[
https://issues.apache.org/jira/browse/FLINK-2004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14553987#comment-14553987
]
ASF GitHub Bot commented on FLINK-2004:
---------------------------------------
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/674#discussion_r30787124
--- Diff:
flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
---
@@ -236,13 +246,22 @@ public void restoreState(long[] state) {
@Override
public void commitCheckpoint(long checkpointId) {
LOG.info("Commit checkpoint {}", checkpointId);
- long[] checkpointOffsets =
pendingCheckpoints.remove(checkpointId);
- if(checkpointOffsets == null) {
+ final int posInMap = pendingCheckpoints.indexOf(checkpointId);
+ if(posInMap == -1) {
LOG.warn("Unable to find pending checkpoint for id {}",
checkpointId);
return;
}
+
+ long[] checkpointOffsets = (long[])
pendingCheckpoints.remove(posInMap);
LOG.info("Got corresponding offsets {}",
Arrays.toString(checkpointOffsets));
--- End diff --
I think this log message can be improved. To something like "Committing
offsets {} to Kafka."
> Memory leak in presence of failed checkpoints in KafkaSource
> ------------------------------------------------------------
>
> Key: FLINK-2004
> URL: https://issues.apache.org/jira/browse/FLINK-2004
> Project: Flink
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 0.9
> Reporter: Stephan Ewen
> Assignee: Robert Metzger
> Priority: Critical
> Fix For: 0.9
>
>
> Checkpoints that fail never send a commit message to the tasks.
> Maintaining a map of all pending checkpoints introduces a memory leak, as
> entries for failed checkpoints will never be removed.
> Approaches to fix this:
> - The source cleans up entries from older checkpoints once a checkpoint is
> committed (simple implementation in a linked hash map)
> - The commit message could include the optional state handle (source needs
> not maintain the map)
> - The checkpoint coordinator could send messages for failed checkpoints?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)