[ 
https://issues.apache.org/jira/browse/FLINK-28861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17576791#comment-17576791
 ] 

Colin Smetz commented on FLINK-28861:
-------------------------------------

{quote}Could you share the UID of the affected operator with us after multiple 
tries?
{quote}
I only have what seems to be a hash of the uid (obtained via 
[getGeneratedOperatorID|https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/runtime/OperatorIDPair.html#getGeneratedOperatorID--]),
 but here's what I get for three successive submissions:
 * 3c217b755850a1fb331af4b7f67946f5
 * 2ea21244917b900c533f89ff25291e8d
 * 48f2f08e9f1663064e1369bd518990a1

Does that help?
{quote}How do you submit your jobs?
{quote}
When we first noticed the problem, we were submitting the job with the REST API 
(POST /jars/:jarid/run). But we've since reproduced it in our tests using 
[PackagedProgramUtils.     
createJobGraph|[https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/org/apache/flink/client/program/PackagedProgramUtils.html].]
{quote}In any case, there is a workaround, you can set 
{{table.exec.legacy-transformation-uids}} to {{{}true{}}}. Let me know if this 
solves your problem for 1.15 upgrades or simple pipelines from previous 
versions? 
{quote}
It works indeed, thanks!
{quote}However, keep in mind that we don't support stateful upgrades between 
Flink versions for Flink SQL yet. 
[FLIP-190|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336489]
 aims to fix this.
{quote}
We are aware of that but since it does seem to work at least sometimes, it's 
still better than nothing. Good to see there is a FLIP to support that.

> Cannot resume from savepoint when using fromChangelogStream in upsert mode
> --------------------------------------------------------------------------
>
>                 Key: FLINK-28861
>                 URL: https://issues.apache.org/jira/browse/FLINK-28861
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.15.1
>            Reporter: Colin Smetz
>            Priority: Critical
>
> I want to use the savepoint mechanism to move existing jobs from one version 
> of Flink to another, by:
>  # Stopping a job with a savepoint
>  # Creating a new job from the savepoint, on the new version.
> In Flink 1.15.1, it fails, even when going from 1.15.1 to 1.15.1. I get this 
> error, meaning that it could not map the state from the previous job to the 
> new one because of one operator:
> {quote}{{Failed to rollback to checkpoint/savepoint 
> hdfs://hdfs-name:8020/flink-savepoints/savepoint-046708-238e921f5e78. Cannot 
> map checkpoint/savepoint state for operator d14a399e92154660771a806b90515d4c 
> to the new program, because the operator is not available in the new 
> program.}}
> {quote}
> After investigation, the problematic operator corresponds to a 
> {{ChangelogNormalize}} operator, that I do not explicitly create. It is 
> generated because I use [{{tableEnv.fromChangelogStream(stream, schema, 
> ChangelogMode.upsert())}}|https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.html#fromChangelogStream-org.apache.flink.streaming.api.datastream.DataStream-org.apache.flink.table.api.Schema-org.apache.flink.table.connector.ChangelogMode-]
>  (the upsert mode is important, other modes do not fail). The table created 
> is passed to an SQL query using the SQL API, which generates something like:
> {quote}{{ChangelogNormalize[8] -> Calc[9] -> TableToDataSteam -> 
> [my_sql_transformation] -> [my_sink]}}
> {quote}
> In previous versions of Flink it seems this operator was always given the 
> same uid so the state could match when starting from the savepoint. In Flink 
> 1.15.1, I see that a different uid is generated every time. I could not find 
> a reliable way to set that uid manually. The only way I found was by going 
> backwards from the transformation:
> {quote}{{dataStream.getTransformation().getInputs().get(0).getInputs().get(0).getInputs().get(0).setUid("the_user_defined_id");}}
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to