[ 
https://issues.apache.org/jira/browse/FLINK-32658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-32658:
----------------------------
    Description: 
When ignore-unclaimed-state is false and the old state is removed, flink should 
throw exception. It's similar to removing a stateful operator.

This case occurs not only when the user removes state, but also when the 
operator is replaced. 

For example: upgrade FlinkKafkaConsumer to KafkaSource. All logical are not 
changed, so the operator id isn't changed. The KafkaSource cannot resume from 
the state of FlinkKafkaConsumer. However, the new flink job can start, and the 
state is silently removed in the new job.(The old state is not physically 
discarded, it is still stored in the state backend, but the new code will never 
use it.)

It also brings an additional problem: the KafkaSource will snapshot 2 states, 
it includes the new state of KafkaSource, and the union list state of 
FlinkKafkaConsumer. Whenever a job resumes from checkpoint, the union List 
state is inflated. Eventually the state size of kafka offset exceeded 200MB.

 !screenshot-1.png! 


  was:
When ignore-unclaimed-state is false and the old state is removed, flink should 
throw exception. It's similar to removing a stateful operator.

This case occurs not only when the user removes state, but also when the 
operator is replaced. 

For example: upgrade FlinkKafkaConsumer to KafkaSource. All logical are not 
changed, so the operator id isn't changed. The KafkaSource cannot resume from 
the state of FlinkKafkaConsumer. However, flink job can start, and the state is 
silently discarded.(The old state is not physically discarded, it is still 
stored in the state backend, but the new code will never use it.)

It also brings an additional problem: the KafkaSource will snapshot 2 states, 
it includes the new state of KafkaSource, and the union list state of 
FlinkKafkaConsumer. Whenever a job resumes from checkpoint, the union List 
state is inflated. Eventually the state size of kafka offset exceeded 200MB.

 !screenshot-1.png! 



> State should not be silently removed when ignore-unclaimed-state is false
> -------------------------------------------------------------------------
>
>                 Key: FLINK-32658
>                 URL: https://issues.apache.org/jira/browse/FLINK-32658
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.18.0, 1.17.1
>            Reporter: Rui Fan
>            Assignee: Rui Fan
>            Priority: Major
>         Attachments: screenshot-1.png
>
>
> When ignore-unclaimed-state is false and the old state is removed, flink 
> should throw exception. It's similar to removing a stateful operator.
> This case occurs not only when the user removes state, but also when the 
> operator is replaced. 
> For example: upgrade FlinkKafkaConsumer to KafkaSource. All logical are not 
> changed, so the operator id isn't changed. The KafkaSource cannot resume from 
> the state of FlinkKafkaConsumer. However, the new flink job can start, and 
> the state is silently removed in the new job.(The old state is not physically 
> discarded, it is still stored in the state backend, but the new code will 
> never use it.)
> It also brings an additional problem: the KafkaSource will snapshot 2 states, 
> it includes the new state of KafkaSource, and the union list state of 
> FlinkKafkaConsumer. Whenever a job resumes from checkpoint, the union List 
> state is inflated. Eventually the state size of kafka offset exceeded 200MB.
>  !screenshot-1.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to