[ 
https://issues.apache.org/jira/browse/FLINK-8467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16384252#comment-16384252
 ] 

Jelmer Kuperus edited comment on FLINK-8467 at 3/2/18 10:37 PM:
----------------------------------------------------------------

Sorry for the late response. I just realized that notifications go to an old 
email address.

I understand and agree that when you create a savepoint, the offsets stored in 
the savepoint should be used after a restore from savepoint

However if you have a job that does not use checkpointing, that is restored 
from a savepoint and it crashes which forces an automatic restart. Then it will 
reprocess *all* the messages received since the savedpoint Even if this restart 
where to happen say a year since this job was restored from the savepoint, then 
it will reprocess a years worth of data! (assuming the retention on your topic 
is that high etc)

In a job that has checkpointing enabled this would not be a problem, because 
new checkpoints are regularly written to disk. and the restarted job would 
continue from the last written checkpoint. 

The behavior I expected  for a job that does not use checkpointing is that on 
failure it would continue from the offsets stored in kafka. It should use the 
savepoint to reinitalize itself, but after that is done it should completely 
ignore it. This way if the job would fail a year after  it was restored from a 
savepoint only max _auto.commit.interval.ms_ worth of events would be replayed. 
 Which is exactly how the job would behave had you not restored in from a 
savepoint.

One could argue that you should then simply not create a savepoint for these 
jobs. Which will solve the problem. But when you are upgrading flink and that 
server has many jobs running on it you might not know which jobs do or do not 
have checkpointing enabled. And if you follow the instructions in the upgrade 
guide by the letter, then you will inadvertently create serious problems. 

 


was (Author: jelmer):
Sorry for the late response. I just realized that notifications go to an old 
email address.

I understand and agree that when you create a savepoint, the offsets stored in 
the savepoint should be used after a restore from savepoint

However if you have a job that does not use checkpointing, that is restored 
from a savepoint and it crashes which forces an automatic restart. Then it will 
reprocess *all* the messages received since the savedpoint Even if this restart 
where to happen say a year since this job was restored from the savepoint. It 
will reprocess a years worrth of data! (assuming the retention on your topic is 
that high etc)

In a job that has checkpointing enabled this would not be a problem, because 
new checkpoints are regularly written to disk. and the restarted job would 
continue from the last written checkpoint. 

The behavior I expected  for a job that does not use checkpointing is that on 
failure it would continue from the offsets stored in kafka. It should use the 
savepoint to reinitalize itself, but after that is done it should completely 
ignore it. This way if the job would fail a year after  it was restored from a 
savepoint only max _auto.commit.interval.ms_ worth of events would be replayed. 
 Which is exactly how the job would behave had you not restored in from a 
savepoint.

One could argue that you should then simply not create a savepoint for these 
jobs. Which will solve the problem. But when you are upgrading flink and that 
server has many jobs running on it you might not know which jobs do or do not 
have checkpointing enabled. And if you follow the instructions in the upgrade 
guide by the letter, then you will inadvertently create serious problems. 

 

> Restoring job that does not use checkpointing from savepoint breaks
> -------------------------------------------------------------------
>
>                 Key: FLINK-8467
>                 URL: https://issues.apache.org/jira/browse/FLINK-8467
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>            Reporter: Jelmer Kuperus
>            Priority: Major
>
> When checkpointing is disabled, the Flink Kafka Consumer relies on the 
> periodic offsets that are committed to the broker by the internal Kafka 
> client.  Such a job would, upon restart, continue from the committed offsets. 
>  However, in the situation that the job is restored from a savepoint, then 
> the offsets within the savepoint supercede the broker-based offsets.
> Here's a simple project that demonstrates the problem : 
> [https://github.com/jelmerk/flink-cancel-restart-job-without-checkpointing]
> And a link to the mailing list thread : 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Starting-a-job-that-does-not-use-checkpointing-from-a-savepoint-is-broken-td17874.html]
>  
> If this is not something you want to address at least the upgrading guide 
> ([https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/upgrading.html)]
>   should caution against this



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to