[
https://issues.apache.org/jira/browse/FLINK-26683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17510519#comment-17510519
]
Dawid Wysakowicz commented on FLINK-26683:
------------------------------------------
{quote}
In my case, the task doesn't need commit. I agree with you when it comes to
committing.
{quote}
Flink does not know if a UDF commits side effects or not. It must always assume
it does and therefore needs to try to recover and commit them.
Together with [~pnowojski] we spent some more thoughts on this issue and we
came to a conclusion there is indeed a problem with how we react to failures
during committing side effects of stop-with-savepoint.
First of all, if the described failure appears we must restore to the savepoint
not the previous checkpoint. This is because otherwise we might end up
producing duplicated results that might've been partially committed for the
savepoint. We will address this regression for 1.15 in FLINK-26783.
Next, we need to distinguish two kinds of stop-with-savepoint, because they
face slightly different problems. The two modes are 1) with drain and 2)
without drain.
Both those modes should not depend on the existence of a savepoint, because
savepoints are owned by a user. Thus it is undesirable to restore to a normal
state and continue processing records.
Moreover, the with drain mode has an additional problem is that drain is
flushing all records i.e. all window aggregates are flushed downstring.
Therefore we should not process any new records after that. However, we do not
store any information in the savepoint it has been in the process of stopping
with drain. This imply we would need to depend on a UDF implementation that it
stores some markers when {{finish{}}} is called that it should not emit any
records after recovery. Even though this should behave that way we can not
assume it is.
We suggest to change it so that when a failure appears during committing side
effects for stop-with-savepoint we restore the state only to commit
side-effects. We do not start a proper job and do not consume records. Such an
information would need to be stored alongside the savepoint in case of
stop-with-savepoint --drain, whereas it could be transient only in the
{{CompletedCheckpointStore}} in case of stop-with-savepoint --no-drain. The
difference results from the fact that --no-drain can and usually is used for
restarting regular processing. That is not the case for drain, which might be
ever restored purely for the purpose of committing side-effects.
Ideally, we should change the Cli that it only returns the result for
stop-with-savepoint while both savepoint succeeds and side-effects are
committed (job reached FINISHED state). There could also be an option to cancel
that operation (e.g. if it falls into an infinite restart loop), which would
print a warning that: "the savepoint is consistent, but it might've uncommitted
side-effects"
> Terminate the job anyway if savepoint finished when stop-with-savepoint
> -----------------------------------------------------------------------
>
> Key: FLINK-26683
> URL: https://issues.apache.org/jira/browse/FLINK-26683
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Checkpointing, Runtime / Coordination
> Affects Versions: 1.15.0, 1.14.4
> Reporter: Liu
> Priority: Major
> Fix For: 1.16.0
>
>
> When we stop with savepoint, the savepoint finishes. But some tasks failover
> for some reason and restart to running. In the end, some tasks are finished
> and some tasks are running. In this case, I think that we should terminate
> all the tasks anyway instead of restarting since the savepoint is finished
> and the job stops consuming data. What do you think?
--
This message was sent by Atlassian Jira
(v8.20.1#820001)