[
https://issues.apache.org/jira/browse/FLINK-27787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Stephen Patel updated FLINK-27787:
----------------------------------
Description:
I think I've found a bug with new task restoration from savepoints.
I have some beam-on-flink pipelines that I restore from savepoints with new
source types (sources not present in the dag that generated the savepoint).
On flink 1.10.1 (beam 2.29) this works fine, the dag spins up with the new
sources and starts emitting data.
On flink 1.14.4 (beam 2.38) this no longer works. The dag spins up, but the
sources never emit anything.
I checked the beam source wrapper (it maps the beam source to the underlying
runner source:
https://github.com/apache/beam/blob/v2.38.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L427),
but it hasn't changed in several years.
By inserting some logging statements, I was able to determine that on flink
1.10, the source is told that it is NOT restored
(FunctionInitializationContext.isRestored() returns false). With flink 1.14,
it is told that it IS restored.
By traversing the flink code changes, I think I've determined that the changes
introduced for FLINK-23854 causes the logic in
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.java to not behave
the way it used to.
In 1.13.6, we see
[here|https://github.com/apache/flink/blob/release-1.13.6/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L221-L240]
that if a subtask does not have state, it will not have a
JobManagerTaskRestore instance set on it, so the
FunctionInitializationContext.isRestored() method will return false.
In 1.14.0 (and all released versions after that), we see
[here|https://github.com/apache/flink/blob/release-1.14.0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L236-L256]
that the chunk of logic that used to be present for dealing with subtasks
which have no state is no longer present. Thus, when restoring from a
savepoint, the task will think it's been restored, even when it didn't exist
(and thus couldn't have state) in the savepoint.
was:
I think I've found a bug with new task restoration from savepoints.
I have some beam-on-flink pipelines that I restore from savepoints with new
source types (sources not present in the dag that generated the savepoint).
On flink 1.10.1 (beam 2.29) this works fine, the dag spins up with the new
sources and starts emitting data.
On flink 1.14.4 (beam 2.38) this no longer works. The dag spins up, but the
sources never emit anything.
I checked the beam source wrapper (it maps the beam source to the underlying
runner source:
https://github.com/apache/beam/blob/v2.38.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L427),
but it hasn't changed in several years.
By inserting some logging statements, I was able to determine that on flink
1.10, the source is told that it is NOT restored
(FunctionInitializationContext.isRestored() returns false). With flink 1.14,
it is told that it IS restored.
By traversing the flink code changes, I think I've determined that the changes
introduced for FLINK-23854 causes the logic in
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.java to not behave
the way it used to.
In 1.13.6, we see
[here|https://github.com/apache/flink/blob/release-1.13.6/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L221-L240]
that if a subtask does not have state, it will not have a
JobManagerTaskRestore instance set on it.
In 1.14.0 (and all released versions after that), we see
[here|https://github.com/apache/flink/blob/release-1.14.0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L236-L256]
that the chunk of logic that used to be present for dealing with subtasks
which have no state is no longer present. Thus, when restoring from a
savepoint, the task will think it's been restored, even when it didn't exist
(and thus couldn't have state) in the savepoint.
> New tasks think they've been restored from savepoint (even when they weren't
> present in that savepoint)
> -------------------------------------------------------------------------------------------------------
>
> Key: FLINK-27787
> URL: https://issues.apache.org/jira/browse/FLINK-27787
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.14.0, 1.15.0, 1.14.2, 1.14.3, 1.14.4
> Reporter: Stephen Patel
> Priority: Major
>
> I think I've found a bug with new task restoration from savepoints.
> I have some beam-on-flink pipelines that I restore from savepoints with new
> source types (sources not present in the dag that generated the savepoint).
> On flink 1.10.1 (beam 2.29) this works fine, the dag spins up with the new
> sources and starts emitting data.
> On flink 1.14.4 (beam 2.38) this no longer works. The dag spins up, but the
> sources never emit anything.
> I checked the beam source wrapper (it maps the beam source to the underlying
> runner source:
> https://github.com/apache/beam/blob/v2.38.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L427),
> but it hasn't changed in several years.
> By inserting some logging statements, I was able to determine that on flink
> 1.10, the source is told that it is NOT restored
> (FunctionInitializationContext.isRestored() returns false). With flink 1.14,
> it is told that it IS restored.
> By traversing the flink code changes, I think I've determined that the
> changes introduced for FLINK-23854 causes the logic in
> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.java to not
> behave the way it used to.
> In 1.13.6, we see
> [here|https://github.com/apache/flink/blob/release-1.13.6/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L221-L240]
> that if a subtask does not have state, it will not have a
> JobManagerTaskRestore instance set on it, so the
> FunctionInitializationContext.isRestored() method will return false.
> In 1.14.0 (and all released versions after that), we see
> [here|https://github.com/apache/flink/blob/release-1.14.0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L236-L256]
> that the chunk of logic that used to be present for dealing with subtasks
> which have no state is no longer present. Thus, when restoring from a
> savepoint, the task will think it's been restored, even when it didn't exist
> (and thus couldn't have state) in the savepoint.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)