[
https://issues.apache.org/jira/browse/FLINK-2004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14553994#comment-14553994
]
ASF GitHub Bot commented on FLINK-2004:
---------------------------------------
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/674#discussion_r30787324
--- Diff:
flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
---
@@ -225,7 +226,16 @@ public void close() {
@Override
public void restoreState(long[] state) {
- // we maintain the offsets in Kafka, so nothing to do.
+ if(lastOffsets == null) {
+ LOG.warn("Restore state called before open() has been
called");
+ return;
+ }
+ LOG.info("Restoring state to {}", Arrays.toString(state));
+ if(lastOffsets.length != state.length) {
--- End diff --
How about sanity checking before logging that things are going to happen?
Usually gives better logging insights...
> Memory leak in presence of failed checkpoints in KafkaSource
> ------------------------------------------------------------
>
> Key: FLINK-2004
> URL: https://issues.apache.org/jira/browse/FLINK-2004
> Project: Flink
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 0.9
> Reporter: Stephan Ewen
> Assignee: Robert Metzger
> Priority: Critical
> Fix For: 0.9
>
>
> Checkpoints that fail never send a commit message to the tasks.
> Maintaining a map of all pending checkpoints introduces a memory leak, as
> entries for failed checkpoints will never be removed.
> Approaches to fix this:
> - The source cleans up entries from older checkpoints once a checkpoint is
> committed (simple implementation in a linked hash map)
> - The commit message could include the optional state handle (source needs
> not maintain the map)
> - The checkpoint coordinator could send messages for failed checkpoints?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)