[
https://issues.apache.org/jira/browse/FLINK-23021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17372002#comment-17372002
]
Dawid Wysakowicz edited comment on FLINK-23021 at 6/30/21, 1:37 PM:
--------------------------------------------------------------------
Making notes here:
The check should be performed in {{CheckpointCoordinator}} when assigning
states to operators and before the job has actually been scheduled. A good
place for that would be e.g.
{{CheckpointCoordinator#restoreLatestCheckpointedStateInternal}}. As an input,
we get a {{JobGraph}} (or to be precise set of {{ExecutionJobVertices}}). The
problem is that we do not have good access to connections between operators. We
do have connections on the Tasks level which might have chained operators.
Connections between operators are cryptically encoded in the {{StreamConfig}},
which is, even worse, not in the same module as {{CheckpointCoordinator}}.
One might ask a question why do we need connections on the operators level. It
is true that when taking a checkpoint either all operators in a StreamTask are
finished or not, but when restoring the JobGraph might be chained differently.
E.g.
Before restore:
{code}
[ A --> B ] --> [ C --> D]
{code}
we take a checkpoint where A and B are finished.
After restore:
{code}
[ A --> B --> C --> D ]
{code}
This example illustrates that we can have both finished and unfinished
operators in a single task. Now users can add a new operator at any point in
the DAG, e.g. between A and B. Putting an unfinished operator E between
finished operators A, B should be illegal. In order to throw an exception in
such a case, we need to know the exact connections between operators.
It gets even more complicated if we need to verify the precondition across
different tasks:
E.g.
Before restore:
{code}
[ A ]----
|
[ B --> C ] --> [ D ]
{code}
We take a checkpoint when A, B, C are finished. D is running.
After restore:
{code}
[ A --> E? ]
|
[ B --> C --> D ]
{code}
And the question here is if we can add a non finished operator E. To answer
this question we need to know the exact operator that consumes the
{{IntermediateDataSet}} produced by the E operator. At the same time, we must
know which {{IntermediateDataSet}} is produced by the operator E.
was (Author: dawidwys):
Making notes here:
The check should be performed in {{CheckpointCoordinator}} when assigning
states to operators and before the job has actually been scheduled. A good
place for that would be e.g.
{{CheckpointCoordinator#restoreLatestCheckpointedStateInternal}}. As an input,
we get a {{JobGraph}} (or to be precise set of {{ExecutionJobVertices}}). The
problem is that we do not have good access to connections between operators. We
do have connections on the Tasks level which might have chained operators.
Connections between operators are cryptically encoded in the {{StreamConfig}},
which is, even worse, not in the same module as {{CheckpointCoordinator}}.
One might ask a question why do we need connections on the operators level. It
is true that when taking a checkpoint either all operators in a StreamTask are
finished or not, but when restoring the JobGraph might be chained differently.
E.g.
Before restore:
{code}
[ A --> B ] --> [ C --> D]
{code}
we take a checkpoint where A and B are finished.
After restore:
{code}
[ A --> B --> C --> D ]
{code}
This example illustrates that we can have both finished and unfinished
operators in a single task. Now users can add a new operator at any point in
the DAG, e.g. between A and B. Putting an unfinished operator E between
finished operators A, B should be illegal. In order to throw an exception in
such a case, we need to know the exact connections between operators.
It gets even more complicated if we need to verify the precondition across
different tasks:
E.g.
Before restore:
{code}
[ A ]-----
|
[ B --> C ] --> [ D ]
{code}
We take a checkpoint when A, B, C are finished. D is running.
After restore:
{code}
[ A --> E? ]
|
[ B --> C --> D ]
{code}
And the question here is if we can add a non finished operator E. To answer
this question we need to know the exact operator that consumes the
{{IntermediateDataSet}} produced by the E operator. At the same time, we must
know which {{IntermediateDataSet}} is produced by the operator E.
> Check for illegal modifications of JobGraph with finished operators
> -------------------------------------------------------------------
>
> Key: FLINK-23021
> URL: https://issues.apache.org/jira/browse/FLINK-23021
> Project: Flink
> Issue Type: Sub-task
> Reporter: Yun Gao
> Assignee: Dawid Wysakowicz
> Priority: Major
> Fix For: 1.14.0
>
>
> Users might modify the job topology before restart for external checkpoint
> and savepoint. To overcome this issue, we would need to check if a fully
> finished operator has been added after a non-fully-finished operator. If so,
> we would throw exception to disallow this situation or re-mark the fully
> finished operator as alive.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)