[
https://issues.apache.org/jira/browse/FLINK-8467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16384252#comment-16384252
]
Jelmer Kuperus edited comment on FLINK-8467 at 3/2/18 10:36 PM:
----------------------------------------------------------------
Sorry for the late response. I just realized that notifications go to an old
email address.
I understand and agree that when you create a savepoint, the offsets stored in
the savepoint should be used after a restore from savepoint
However if you have a job that does not use checkpointing, that is restored
from a savepoint and it crashes which forces an automatic restart. Then it will
reprocess *all* the messages received since the savedpoint Even if this restart
where to happen say a year since this job was restored from the savepoint. It
will reprocess a years worrth of data! (assuming the retention on your topic is
that high etc)
In a job that has checkpointing enabled this would not be a problem, because
new checkpoints are regularly written to disk. and the restarted job would
continue from the last written checkpoint.
The behavior I expected for a job that does not use checkpointing is that on
failure it would continue from the offsets stored in kafka. It should use the
savepoint to reinitalize itself, but after that is done it should completely
ignore it. This way if the job would fail a year after it was restored from a
savepoint only max _auto.commit.interval.ms_ worth of events would be replayed.
Which is exactly how the job would behave had you not restored in from a
savepoint.
One could argue that you should then simply not create a savepoint for these
jobs. Which will solve the problem. But when you are upgrading flink and that
server has many jobs running on it you might not know which jobs do or do not
have checkpointing enabled. And if you follow the instructions in the upgrade
guide by the letter, then you will inadvertently create serious problems.
was (Author: jelmer):
Sorry for the late response. I just realized that notifications go to an old
email address.
I understand and agree that when you create a savepoint, the offsets stored in
the savepoint should be used after a restore from savepoint
However if you have a job that does not use checkpointing, that is restored
from a savepoint and it crashes which forces an automatic restart. Then it will
reprocess *all* the messages received since the savedpoint Even if this restart
where to happen say a year since this job was restored from the savepoint. It
will reprocess a years worrth of data! (assuming the retention on your topic is
that high etc)
In a job that has checkpointing enabled this would not be a problem, because
new checkpoints are regularly written to disk. and the restarted job would
continue from the last written checkpoint.
The behavior I expected for a job that does not use checkpointing is that on
failure it would continue from the offsets stored in kafka. This way if the job
would fail a year after it was restored from a savepoint only max
_auto.commit.interval.ms_ worth of events would be replayed. Which is exactly
how the job would behave had you not restored in from a savepoint.
One could argue that you should then simply not create a savepoint for these
jobs. Which will solve the problem. But when you are upgrading flink and that
server has many jobs running on it you might not know which jobs do or do not
have checkpointing enabled. And if you follow the instructions in the upgrade
guide by the letter, then you will inadvertently create serious problems.
> Restoring job that does not use checkpointing from savepoint breaks
> -------------------------------------------------------------------
>
> Key: FLINK-8467
> URL: https://issues.apache.org/jira/browse/FLINK-8467
> Project: Flink
> Issue Type: Bug
> Components: State Backends, Checkpointing
> Reporter: Jelmer Kuperus
> Priority: Major
>
> When checkpointing is disabled, the Flink Kafka Consumer relies on the
> periodic offsets that are committed to the broker by the internal Kafka
> client. Such a job would, upon restart, continue from the committed offsets.
> However, in the situation that the job is restored from a savepoint, then
> the offsets within the savepoint supercede the broker-based offsets.
> Here's a simple project that demonstrates the problem :
> [https://github.com/jelmerk/flink-cancel-restart-job-without-checkpointing]
> And a link to the mailing list thread :
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Starting-a-job-that-does-not-use-checkpointing-from-a-savepoint-is-broken-td17874.html]
>
> If this is not something you want to address at least the upgrading guide
> ([https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/upgrading.html)]
> should caution against this
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)