[ https://issues.apache.org/jira/browse/FLINK-8467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16384252#comment-16384252 ]
Jelmer Kuperus edited comment on FLINK-8467 at 3/2/18 10:37 PM: ---------------------------------------------------------------- Sorry for the late response. I just realized that notifications go to an old email address. I understand and agree that when you create a savepoint, the offsets stored in the savepoint should be used after a restore from savepoint However if you have a job that does not use checkpointing, that is restored from a savepoint and it crashes which forces an automatic restart. Then it will reprocess *all* the messages received since the savedpoint Even if this restart where to happen say a year since this job was restored from the savepoint, then it will reprocess a years worth of data! (assuming the retention on your topic is that high etc) In a job that has checkpointing enabled this would not be a problem, because new checkpoints are regularly written to disk. and the restarted job would continue from the last written checkpoint. The behavior I expected for a job that does not use checkpointing is that on failure it would continue from the offsets stored in kafka. It should use the savepoint to reinitalize itself, but after that is done it should completely ignore it. This way if the job would fail a year after it was restored from a savepoint only max _auto.commit.interval.ms_ worth of events would be replayed. Which is exactly how the job would behave had you not restored in from a savepoint. One could argue that you should then simply not create a savepoint for these jobs. Which will solve the problem. But when you are upgrading flink and that server has many jobs running on it you might not know which jobs do or do not have checkpointing enabled. And if you follow the instructions in the upgrade guide by the letter, then you will inadvertently create serious problems. was (Author: jelmer): Sorry for the late response. I just realized that notifications go to an old email address. I understand and agree that when you create a savepoint, the offsets stored in the savepoint should be used after a restore from savepoint However if you have a job that does not use checkpointing, that is restored from a savepoint and it crashes which forces an automatic restart. Then it will reprocess *all* the messages received since the savedpoint Even if this restart where to happen say a year since this job was restored from the savepoint. It will reprocess a years worrth of data! (assuming the retention on your topic is that high etc) In a job that has checkpointing enabled this would not be a problem, because new checkpoints are regularly written to disk. and the restarted job would continue from the last written checkpoint. The behavior I expected for a job that does not use checkpointing is that on failure it would continue from the offsets stored in kafka. It should use the savepoint to reinitalize itself, but after that is done it should completely ignore it. This way if the job would fail a year after it was restored from a savepoint only max _auto.commit.interval.ms_ worth of events would be replayed. Which is exactly how the job would behave had you not restored in from a savepoint. One could argue that you should then simply not create a savepoint for these jobs. Which will solve the problem. But when you are upgrading flink and that server has many jobs running on it you might not know which jobs do or do not have checkpointing enabled. And if you follow the instructions in the upgrade guide by the letter, then you will inadvertently create serious problems. > Restoring job that does not use checkpointing from savepoint breaks > ------------------------------------------------------------------- > > Key: FLINK-8467 > URL: https://issues.apache.org/jira/browse/FLINK-8467 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing > Reporter: Jelmer Kuperus > Priority: Major > > When checkpointing is disabled, the Flink Kafka Consumer relies on the > periodic offsets that are committed to the broker by the internal Kafka > client. Such a job would, upon restart, continue from the committed offsets. > However, in the situation that the job is restored from a savepoint, then > the offsets within the savepoint supercede the broker-based offsets. > Here's a simple project that demonstrates the problem : > [https://github.com/jelmerk/flink-cancel-restart-job-without-checkpointing] > And a link to the mailing list thread : > [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Starting-a-job-that-does-not-use-checkpointing-from-a-savepoint-is-broken-td17874.html] > > If this is not something you want to address at least the upgrading guide > ([https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/upgrading.html)] > should caution against this -- This message was sent by Atlassian JIRA (v7.6.3#76005)