[
https://issues.apache.org/jira/browse/FLINK-2296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14611702#comment-14611702
]
Rico Bergmann edited comment on FLINK-2296 at 7/2/15 9:37 AM:
--------------------------------------------------------------
I experimented with the checkpointing in the 0.9.0 version. I have two stateful
operators, implementing the Checkpointed interface, and a persistent kafka
source. When I enable checkpointing, the kafka sources are checkpointed, but
not my operators. is this possibly a bug?
was (Author: bergmann):
I experimented with the checkpointing in the 0.9.0 version. I have to stateful
operators, implementing the Checkpointed interface, and a persistent kafka
source. When I enable checkpointing, the kafka sources are checkpointed, but
not my operators. is this possibly a bug?
> Checkpoint committing broken
> ----------------------------
>
> Key: FLINK-2296
> URL: https://issues.apache.org/jira/browse/FLINK-2296
> Project: Flink
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 0.10
> Reporter: Robert Metzger
> Assignee: Robert Metzger
> Priority: Blocker
>
> While working on fixing the failing {{PersistentKafkaSource}} test, I
> realized that the recent changes introduced in "New operator state
> interfaces" https://github.com/apache/flink/pull/747 (sadly, there is no JIRA
> for this huge change) introduced some changes that I was not aware of.
> * The {{CheckpointCoordinator}} is now sending the StateHandle back to the
> TaskManager when confirming a checkpoint. For the non-FS case, this means
> that for checkpoint committed operators, the state is send twice over the
> wire for each checkpoint.
> For the FS case, this means that for every checkpoint commit, the state needs
> to be retrieved from the file system.
> Did you conduct any tests on a cluster to measure the performance impact of
> that?
> I see three approaches for fixing the aforementioned issue:
> - keep it this way (probably poor performance)
> - always keep the state for uncommitted checkpoints in the TaskManager's
> memory. Therefore, we need to come up with a good eviction strategy. I don't
> know the implications for large state.
> - change the interface and do not provide the state to the user function
> (=old behavior). This forces users to think about how they want to keep the
> state (but it is also a bit more work for them)
> I would like to get some feedback on how to solve this issue!
> Also, I discovered the following bugs:
> * Non-source tasks didn't get {{commitCheckpoint}} calls, even though they
> implemented the {{CheckpointCommitter}} interface. I fixed this issue in my
> current branch.
> * The state passed to the {{commitCheckpoint}} method did not match with the
> subtask id. So user functions were receiving states from other parallel
> instances. This lead to faulty behavior in the KafkaSource (thats also the
> reason why the KafkaITCase was failing more frequently ...). I fixed this
> issue in my current branch.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)