[ 
https://issues.apache.org/jira/browse/FLINK-27787?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17640369#comment-17640369
 ] 

Yanfei Lei commented on FLINK-27787:
------------------------------------

[~arvid] could you please take a look?  I found that 
[PR17019|https://github.com/apache/flink/pull/17019] has discussed [related 
possibilities| 
[https://github.com/apache/flink/pull/17019/files#r697712620]|https://github.com/apache/flink/pull/17019/files#r697712620],]

> New tasks think they've been restored from savepoint (even when they weren't 
> present in that savepoint)
> -------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-27787
>                 URL: https://issues.apache.org/jira/browse/FLINK-27787
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.14.0, 1.14.2, 1.14.3, 1.14.4, 1.15.0
>            Reporter: Stephen Patel
>            Priority: Major
>
> I think I've found a bug with new task restoration from savepoints.
> I have some beam-on-flink pipelines that I restore from savepoints with new 
> source types (sources not present in the dag that generated the savepoint).
> On flink 1.10.1 (beam 2.29) this works fine, the dag spins up with the new 
> sources and starts emitting data.
> On flink 1.14.4 (beam 2.38) this no longer works.   The dag spins up, but the 
> sources never emit anything.
> I checked the beam source wrapper (it maps the beam source to the underlying 
> runner source: 
> https://github.com/apache/beam/blob/v2.38.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L427),
>  but it hasn't changed in several years.
> By inserting some logging statements, I was able to determine that on flink 
> 1.10, the source is told that it is NOT restored 
> (FunctionInitializationContext.isRestored() returns false).  With flink 1.14, 
> it is told that it IS restored.
> By traversing the flink code changes, I think I've determined that the 
> changes introduced for FLINK-23854 causes the logic in 
> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.java to not 
> behave the way it used to.
> In 1.13.6, we see 
> [here|https://github.com/apache/flink/blob/release-1.13.6/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L221-L240]
>  that if a subtask does not have state, it will not have a 
> JobManagerTaskRestore instance set on it, so the 
> FunctionInitializationContext.isRestored() method will return false.
> In 1.14.0 (and all released versions after that), we see 
> [here|https://github.com/apache/flink/blob/release-1.14.0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L236-L256]
>  that the chunk of logic that used to be present for dealing with subtasks 
> which have no state is no longer present.  Thus, when restoring from a 
> savepoint, the task will think it's been restored, even when it didn't exist 
> (and thus couldn't have state) in the savepoint.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to