[
https://issues.apache.org/jira/browse/FLINK-28861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17576821#comment-17576821
]
Timo Walther commented on FLINK-28861:
--------------------------------------
Thanks for the response! The generated UID hashes were Flink 1.14 behavior, we
replaced them with explicit and human readable UIDs as shown above. So 1.15.1
to 1.15.1 should not show you a hash. But nevertheless, the bug that will
result in different UIDs when executing the same JAR twice needs to be fixed.
> Cannot resume from savepoint when using fromChangelogStream in upsert mode
> --------------------------------------------------------------------------
>
> Key: FLINK-28861
> URL: https://issues.apache.org/jira/browse/FLINK-28861
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.15.1
> Reporter: Colin Smetz
> Priority: Critical
>
> I want to use the savepoint mechanism to move existing jobs from one version
> of Flink to another, by:
> # Stopping a job with a savepoint
> # Creating a new job from the savepoint, on the new version.
> In Flink 1.15.1, it fails, even when going from 1.15.1 to 1.15.1. I get this
> error, meaning that it could not map the state from the previous job to the
> new one because of one operator:
> {quote}{{Failed to rollback to checkpoint/savepoint
> hdfs://hdfs-name:8020/flink-savepoints/savepoint-046708-238e921f5e78. Cannot
> map checkpoint/savepoint state for operator d14a399e92154660771a806b90515d4c
> to the new program, because the operator is not available in the new
> program.}}
> {quote}
> After investigation, the problematic operator corresponds to a
> {{ChangelogNormalize}} operator, that I do not explicitly create. It is
> generated because I use [{{tableEnv.fromChangelogStream(stream, schema,
> ChangelogMode.upsert())}}|https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.html#fromChangelogStream-org.apache.flink.streaming.api.datastream.DataStream-org.apache.flink.table.api.Schema-org.apache.flink.table.connector.ChangelogMode-]
> (the upsert mode is important, other modes do not fail). The table created
> is passed to an SQL query using the SQL API, which generates something like:
> {quote}{{ChangelogNormalize[8] -> Calc[9] -> TableToDataSteam ->
> [my_sql_transformation] -> [my_sink]}}
> {quote}
> In previous versions of Flink it seems this operator was always given the
> same uid so the state could match when starting from the savepoint. In Flink
> 1.15.1, I see that a different uid is generated every time. I could not find
> a reliable way to set that uid manually. The only way I found was by going
> backwards from the transformation:
> {quote}{{dataStream.getTransformation().getInputs().get(0).getInputs().get(0).getInputs().get(0).setUid("the_user_defined_id");}}
> {quote}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)