[ 
https://issues.apache.org/jira/browse/FLINK-23021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17372002#comment-17372002
 ] 

Dawid Wysakowicz commented on FLINK-23021:
------------------------------------------

Making notes here:
The check should be performed in {{CheckpointCoordinator}} when assigning 
states to operators and before the job has actually been scheduled. A good 
place for that would be e.g. 
{{CheckpointCoordinator#restoreLatestCheckpointedStateInternal}}. As an input, 
we get a {{JobGraph}} (or to be precise set of {{ExecutionJobVertices}}). The 
problem is that we do not have a good access to connections between operators. 
We do have connections on Tasks level which might have chained operators. 
Connections between operators are cryptically encoded in the {{StreamConfig}}, 
which is, even worse, not in the same module as {{CheckpointCoordinator}}.

One might ask a question why do we need connections on operators level.  It is 
true that when taking a checkpoint either all operators in a StreamTask are 
finished or not, but when restoring the JobGraph might be chained differently. 
E.g.
Before restore:
{code}
[ A --> B ] --> [ C --> D]
{code}
we take a checkpoint where A and B are finished.
After restore:
{code}
[ A --> B --> C --> D ] 
{code}
This example illustrates that we can have both finished and unfinished 
operators in a single task. Now users can add new operator at any point in the 
DAG, e.g. between A and B. Putting an unfinished operator E between finished 
operators A, B should be illegal. In order to throw an exception in such a case 
we need to know the exact connections between operators.

> Check for illegal modifications of JobGraph with finished operators
> -------------------------------------------------------------------
>
>                 Key: FLINK-23021
>                 URL: https://issues.apache.org/jira/browse/FLINK-23021
>             Project: Flink
>          Issue Type: Sub-task
>            Reporter: Yun Gao
>            Assignee: Dawid Wysakowicz
>            Priority: Major
>             Fix For: 1.14.0
>
>
> Users might modify the job topology before restart for external checkpoint 
> and savepoint. To overcome this issue, we would need to check if a fully 
> finished operator has been added after a non-fully-finished operator. If so, 
> we would throw exception to disallow this situation or re-mark the fully 
> finished operator as alive. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to