[ 
https://issues.apache.org/jira/browse/FLINK-2296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-2296:
----------------------------------
    Description: 
While working on fixing the failing {{PersistentKafkaSource}} test, I realized 
that the recent changes introduced in "New operator state interfaces" 
https://github.com/apache/flink/pull/747 (sadly, there is no JIRA for this huge 
change) introduced some changes that I was not aware of.

* The {{CheckpointCoordinator}} is now sending the StateHandle back to the 
TaskManager when confirming a checkpoint. For the non-FS case, this means that 
for checkpoint committed operators, the state is send twice over the wire for 
each checkpoint. 
For the FS case, this means that for every checkpoint commit, the state needs 
to be retrieved from the file system.
Did you conduct any tests on a cluster to measure the performance impact of 
that?

I see three approaches for fixing the aforementioned issue:
- keep it this way (probably poor performance)
- always keep the state for uncommitted checkpoints in the TaskManager's 
memory. Therefore, we need to come up with a good eviction strategy. I don't 
know the implications for large state.
- change the interface and do not provide the state to the user function (=old 
behavior). This forces users to think about how they want to keep the state 
(but it is also a bit more work for them)

I would like to get some feedback on how to solve this issue!


Also, I discovered the following bugs:
* Non-source tasks didn't get {{commitCheckpoint}} calls, even though they 
implemented the {{CheckpointCommitter}} interface. I fixed this issue in my 
current branch.
* The state passed to the {{commitCheckpoint}} method did not match with the 
subtask id. So user functions were receiving states from other parallel 
instances. This lead to faulty behavior in the KafkaSource (thats also the 
reason why the KafkaITCase was failing more frequently ...). I fixed this issue 
in my current branch.


  was:
While working on fixing the failing {{PersistentKafkaSource}} test, I realized 
that the recent changes introduced in "New operator state interfaces" 
https://github.com/apache/flink/pull/747 (sadly, there is no JIRA for this huge 
change) introduced some changes that I was not aware of.

* The {{CheckpointCooridnator}} is now sending the StateHandle back to the 
TaskManager when confirming a checkpoint. For the non-FS case, this means that 
for checkpoint committed operators, the state is send twice over the wire for 
each checkpoint. 
For the FS case, this means that for every checkpoint commit, the state needs 
to be retrieved from the file system.
Did you conduct any tests on a cluster to measure the performance impact of 
that?

I see three approaches for fixing the aforementioned issue:
- keep it this way (probably poor performance)
- always keep the state for uncommitted checkpoints in the TaskManager's 
memory. Therefore, we need to come up with a good eviction strategy. I don't 
know the implications for large state.
- change the interface and do not provide the state to the user function (=old 
behavior). This forces users to think about how they want to keep the state 
(but it is also a bit more work for them)

I would like to get some feedback on how to solve this issue!


Also, I discovered the following bugs:
* Non-source tasks didn't get {{commitCheckpoint}} calls, even though they 
implemented the {{CheckpointCommitter}} interface. I fixed this issue in my 
current branch.
* The state passed to the {{commitCheckpoint}} method did not match with the 
subtask id. So user functions were receiving states from other parallel 
instances. This lead to faulty behavior in the KafkaSource (thats also the 
reason why the KafkaITCase was failing more frequently ...). I fixed this issue 
in my current branch.



> Checkpoint committing broken
> ----------------------------
>
>                 Key: FLINK-2296
>                 URL: https://issues.apache.org/jira/browse/FLINK-2296
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 0.10
>            Reporter: Robert Metzger
>            Assignee: Robert Metzger
>            Priority: Blocker
>
> While working on fixing the failing {{PersistentKafkaSource}} test, I 
> realized that the recent changes introduced in "New operator state 
> interfaces" https://github.com/apache/flink/pull/747 (sadly, there is no JIRA 
> for this huge change) introduced some changes that I was not aware of.
> * The {{CheckpointCoordinator}} is now sending the StateHandle back to the 
> TaskManager when confirming a checkpoint. For the non-FS case, this means 
> that for checkpoint committed operators, the state is send twice over the 
> wire for each checkpoint. 
> For the FS case, this means that for every checkpoint commit, the state needs 
> to be retrieved from the file system.
> Did you conduct any tests on a cluster to measure the performance impact of 
> that?
> I see three approaches for fixing the aforementioned issue:
> - keep it this way (probably poor performance)
> - always keep the state for uncommitted checkpoints in the TaskManager's 
> memory. Therefore, we need to come up with a good eviction strategy. I don't 
> know the implications for large state.
> - change the interface and do not provide the state to the user function 
> (=old behavior). This forces users to think about how they want to keep the 
> state (but it is also a bit more work for them)
> I would like to get some feedback on how to solve this issue!
> Also, I discovered the following bugs:
> * Non-source tasks didn't get {{commitCheckpoint}} calls, even though they 
> implemented the {{CheckpointCommitter}} interface. I fixed this issue in my 
> current branch.
> * The state passed to the {{commitCheckpoint}} method did not match with the 
> subtask id. So user functions were receiving states from other parallel 
> instances. This lead to faulty behavior in the KafkaSource (thats also the 
> reason why the KafkaITCase was failing more frequently ...). I fixed this 
> issue in my current branch.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to