[ 
https://issues.apache.org/jira/browse/FLINK-28861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17576770#comment-17576770
 ] 

Timo Walther commented on FLINK-28861:
--------------------------------------

After looking into this topic, I have a first guess what the problem might be. 
Could you share the UID of the affected operator with us after multiple tries? 
There might be a bug in the design. A UID is currently set as 
{{1_stream-exec-table-source-scan_1_external-datastream}}, however, the first 
component is generated by an {{AtomicCounter}} that is started per JVM. Which 
means that multiple runs of the same main() in the same JVM might generated 
different UIDs. How do you submit your jobs?

In any case, there is a workaround, you can set 
{{table.exec.legacy-transformation-uids}} to {{true}}. Let me know if this 
solves your problem for 1.15 upgrades or simple pipelines from previous 
versions? However, keep in mind that we don't support stateful upgrades between 
Flink versions for Flink SQL yet. 
[FLIP-190|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336489]
 aims to fix this.

> Cannot resume from savepoint when using fromChangelogStream in upsert mode
> --------------------------------------------------------------------------
>
>                 Key: FLINK-28861
>                 URL: https://issues.apache.org/jira/browse/FLINK-28861
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.15.1
>            Reporter: Colin Smetz
>            Priority: Major
>
> I want to use the savepoint mechanism to move existing jobs from one version 
> of Flink to another, by:
>  # Stopping a job with a savepoint
>  # Creating a new job from the savepoint, on the new version.
> In Flink 1.15.1, it fails, even when going from 1.15.1 to 1.15.1. I get this 
> error, meaning that it could not map the state from the previous job to the 
> new one because of one operator:
> {quote}{{Failed to rollback to checkpoint/savepoint 
> hdfs://hdfs-name:8020/flink-savepoints/savepoint-046708-238e921f5e78. Cannot 
> map checkpoint/savepoint state for operator d14a399e92154660771a806b90515d4c 
> to the new program, because the operator is not available in the new 
> program.}}
> {quote}
> After investigation, the problematic operator corresponds to a 
> {{ChangelogNormalize}} operator, that I do not explicitly create. It is 
> generated because I use [{{tableEnv.fromChangelogStream(stream, schema, 
> ChangelogMode.upsert())}}|https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.html#fromChangelogStream-org.apache.flink.streaming.api.datastream.DataStream-org.apache.flink.table.api.Schema-org.apache.flink.table.connector.ChangelogMode-]
>  (the upsert mode is important, other modes do not fail). The table created 
> is passed to an SQL query using the SQL API, which generates something like:
> {quote}{{ChangelogNormalize[8] -> Calc[9] -> TableToDataSteam -> 
> [my_sql_transformation] -> [my_sink]}}
> {quote}
> In previous versions of Flink it seems this operator was always given the 
> same uid so the state could match when starting from the savepoint. In Flink 
> 1.15.1, I see that a different uid is generated every time. I could not find 
> a reliable way to set that uid manually. The only way I found was by going 
> backwards from the transformation:
> {quote}{{dataStream.getTransformation().getInputs().get(0).getInputs().get(0).getInputs().get(0).setUid("the_user_defined_id");}}
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to