[ 
https://issues.apache.org/jira/browse/FLINK-26683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17510519#comment-17510519
 ] 

Dawid Wysakowicz commented on FLINK-26683:
------------------------------------------

{quote}
 In my case, the task doesn't need commit. I agree with you when it comes to 
committing. 
{quote}
Flink does not know if a UDF commits side effects or not. It must always assume 
it does and therefore needs to try to recover and commit them.

Together with [~pnowojski] we spent some more thoughts on this issue and we 
came to a conclusion there is indeed a problem with how we react to failures 
during committing side effects of stop-with-savepoint.
First of all, if the described failure appears we must restore to the savepoint 
not the previous checkpoint. This is because otherwise we might end up 
producing duplicated results that might've been partially committed for the 
savepoint. We will address this regression for 1.15 in FLINK-26783.

Next, we need to distinguish two kinds of stop-with-savepoint, because they 
face slightly different problems. The two modes are 1) with drain and 2) 
without drain.

Both those modes should not depend on the existence of a savepoint, because 
savepoints are owned by a user. Thus it is undesirable to restore to a normal 
state and continue processing records.
Moreover, the with drain mode has an additional problem is that drain is 
flushing all records i.e. all window aggregates are flushed downstring. 
Therefore we should not process any new records after that. However, we do not 
store any information in the savepoint it has been in the process of stopping 
with drain. This imply we would need to depend on a UDF implementation that it 
stores some markers when {{finish{}}} is called that it should not emit any 
records after recovery. Even though this should behave that way we can not 
assume it is.

We suggest to change it so that when a failure appears during committing side 
effects for stop-with-savepoint we restore the state only to commit 
side-effects. We do not start a proper job and do not consume records. Such an 
information would need to be stored alongside the savepoint in case of 
stop-with-savepoint --drain, whereas it could be transient only in the 
{{CompletedCheckpointStore}} in case of stop-with-savepoint --no-drain. The 
difference results from the fact that --no-drain can and usually is used for 
restarting regular processing. That is not the case for drain, which might be 
ever restored purely for the purpose of committing side-effects.

Ideally, we should change the Cli that it only returns the result for 
stop-with-savepoint while both savepoint succeeds and side-effects are 
committed (job reached FINISHED state). There could also be an option to cancel 
that operation (e.g. if it falls into an infinite restart loop), which would 
print a warning that: "the savepoint is consistent, but it might've uncommitted 
side-effects"

> Terminate the job anyway if savepoint finished when stop-with-savepoint
> -----------------------------------------------------------------------
>
>                 Key: FLINK-26683
>                 URL: https://issues.apache.org/jira/browse/FLINK-26683
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing, Runtime / Coordination
>    Affects Versions: 1.15.0, 1.14.4
>            Reporter: Liu
>            Priority: Major
>             Fix For: 1.16.0
>
>
> When we stop with savepoint, the savepoint finishes. But some tasks failover 
> for some reason and restart to running. In the end, some tasks are finished 
> and some tasks are running. In this case, I think that we should terminate 
> all the tasks anyway instead of restarting since the savepoint is finished 
> and the job stops consuming data. What do you think?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to