[
https://issues.apache.org/jira/browse/FLINK-2296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14611790#comment-14611790
]
Rico Bergmann commented on FLINK-2296:
--------------------------------------
I refer to the state checkpointing. And the checkpoint should be stored in the
local filesystem (I use a FileStateHandler).
The use case starts with 3 PersistentKafkaSources followed by a map (per
source, basically datatype conversions, stateless), followed by a hash
partitioner for each. Then 2 streams are connected with a stateful
RichCoMapFunction (should be checkpointed) and the resulting dataflow again
connected to the third stream via RichCoMap (again should be checkpointed).
Then complex computations are done - but I think they shouldn't be important
for this issue.
> Checkpoint committing broken
> ----------------------------
>
> Key: FLINK-2296
> URL: https://issues.apache.org/jira/browse/FLINK-2296
> Project: Flink
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 0.10
> Reporter: Robert Metzger
> Assignee: Robert Metzger
> Priority: Blocker
>
> While working on fixing the failing {{PersistentKafkaSource}} test, I
> realized that the recent changes introduced in "New operator state
> interfaces" https://github.com/apache/flink/pull/747 (sadly, there is no JIRA
> for this huge change) introduced some changes that I was not aware of.
> * The {{CheckpointCoordinator}} is now sending the StateHandle back to the
> TaskManager when confirming a checkpoint. For the non-FS case, this means
> that for checkpoint committed operators, the state is send twice over the
> wire for each checkpoint.
> For the FS case, this means that for every checkpoint commit, the state needs
> to be retrieved from the file system.
> Did you conduct any tests on a cluster to measure the performance impact of
> that?
> I see three approaches for fixing the aforementioned issue:
> - keep it this way (probably poor performance)
> - always keep the state for uncommitted checkpoints in the TaskManager's
> memory. Therefore, we need to come up with a good eviction strategy. I don't
> know the implications for large state.
> - change the interface and do not provide the state to the user function
> (=old behavior). This forces users to think about how they want to keep the
> state (but it is also a bit more work for them)
> I would like to get some feedback on how to solve this issue!
> Also, I discovered the following bugs:
> * Non-source tasks didn't get {{commitCheckpoint}} calls, even though they
> implemented the {{CheckpointCommitter}} interface. I fixed this issue in my
> current branch.
> * The state passed to the {{commitCheckpoint}} method did not match with the
> subtask id. So user functions were receiving states from other parallel
> instances. This lead to faulty behavior in the KafkaSource (thats also the
> reason why the KafkaITCase was failing more frequently ...). I fixed this
> issue in my current branch.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)