[ 
https://issues.apache.org/jira/browse/FLINK-30070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-30070:
-----------------------------------
    Description: 
Side effects are any external state - a state that is stored not in Flink, but 
in an external system, like for example connectors transactions (KafkaSink, 
...).

We shouldn't be relaying on the external systems for storing part of the job's 
state, especially for any long period of time. The most prominent issue is that 
Kafka transactions can time out, leading to a data loss if transaction hasn't 
been committed.

Stop-with-savepoint, currently  guarantee that {{notifyCheckpointCompleted}} 
call will be issued, so properly implemented operators are guaranteed to 
committed it's state. However this information is currently not stored in the 
checkpoint in any way ( FLINK-16419 ). Larger issue is how to deal with 
savepoints, since there we currently do not have any guarantees that 
transactions have been committed. 

Some potential solution might be to expand API (like {{CheckpointedFunction}} 
), to let the operators/functions know, that they should 
close/commit/clear/deal with external state differently and use that API during 
stop-with-savepoint and intermediate savepoints. Note that since Flink 1.15, 
intermediate savepoints are never committed, so most likely they shouldn't even 
try to store/pre-commit any external state/transactions.

  was:
Side effects are any external state - a state that is stored not in Flink, but 
in an external system, like for example connectors transactions (KafkaSink, 
...).

We shouldn't be relaying on the external systems for storing part of the job's 
state, especially for any long period of time. The most prominent issue is that 
Kafka transactions can time out, leading to a data loss if transaction hasn't 
been committed.

Stop-with-savepoint, currently  guarantee that {{notifyCheckpointCompleted}} 
call will be issued, so properly implemented operators are guaranteed to 
committed it's state. However this information is currently not stored in the 
checkpoint in any way ( FLINK-16419 ). Larger issue is how to deal with 
savepoints, since there we currently do not have any guarantees that 
transactions have been committed. 

Some potential solution might be to expand API (like {{CheckpointedFunction}} 
), to let the operators/functions know, that they should 
close/commit/clear/deal with external state differently and use that API during 
stop-with-savepoint + rework how regular savepoints are handled. 


> Create savepoints without side effects
> --------------------------------------
>
>                 Key: FLINK-30070
>                 URL: https://issues.apache.org/jira/browse/FLINK-30070
>             Project: Flink
>          Issue Type: New Feature
>          Components: API / DataStream, Runtime / Checkpointing
>    Affects Versions: 1.16.0, 1.15.2, 1.14.6
>            Reporter: Piotr Nowojski
>            Priority: Major
>
> Side effects are any external state - a state that is stored not in Flink, 
> but in an external system, like for example connectors transactions 
> (KafkaSink, ...).
> We shouldn't be relaying on the external systems for storing part of the 
> job's state, especially for any long period of time. The most prominent issue 
> is that Kafka transactions can time out, leading to a data loss if 
> transaction hasn't been committed.
> Stop-with-savepoint, currently  guarantee that {{notifyCheckpointCompleted}} 
> call will be issued, so properly implemented operators are guaranteed to 
> committed it's state. However this information is currently not stored in the 
> checkpoint in any way ( FLINK-16419 ). Larger issue is how to deal with 
> savepoints, since there we currently do not have any guarantees that 
> transactions have been committed. 
> Some potential solution might be to expand API (like {{CheckpointedFunction}} 
> ), to let the operators/functions know, that they should 
> close/commit/clear/deal with external state differently and use that API 
> during stop-with-savepoint and intermediate savepoints. Note that since Flink 
> 1.15, intermediate savepoints are never committed, so most likely they 
> shouldn't even try to store/pre-commit any external state/transactions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to