[
https://issues.apache.org/jira/browse/FLINK-28861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580104#comment-17580104
]
Timo Walther commented on FLINK-28861:
--------------------------------------
Fixed in 1.15.2:
commit 105d7c911bd0c5d8634417c22164547651abf07b
{code}
[FLINK-28861][table] Make UID generation behavior configurable and plan-only by
default
Before this commit, due to changes for FLIP-190, every operator generated by
the planner
got a UID assigned. However, the UID is based on a static counter that might
return different
results depending on the environment. Thus, UIDs are not deterministic and make
stateful
restores impossible e.g. when going from 1.15.0 -> 1.15.1. This PR restores the
old pre-1.15
behavior for regular Table API. It only adds UIDs if the operator has been
created from a
compiled plan. A compiled plan makes the UIDs static and thus deterministic.
table.exec.uid.generation=ALWAYS exists for backwards compatibility and could
make stateful
upgrades possible even with invalid UIDs on best effort basis.
{code}
> Cannot resume from savepoint when using fromChangelogStream in upsert mode
> --------------------------------------------------------------------------
>
> Key: FLINK-28861
> URL: https://issues.apache.org/jira/browse/FLINK-28861
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.15.1
> Reporter: Colin Smetz
> Assignee: Timo Walther
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 1.16.0, 1.15.2
>
>
> I want to use the savepoint mechanism to move existing jobs from one version
> of Flink to another, by:
> # Stopping a job with a savepoint
> # Creating a new job from the savepoint, on the new version.
> In Flink 1.15.1, it fails, even when going from 1.15.1 to 1.15.1. I get this
> error, meaning that it could not map the state from the previous job to the
> new one because of one operator:
> {quote}{{Failed to rollback to checkpoint/savepoint
> hdfs://hdfs-name:8020/flink-savepoints/savepoint-046708-238e921f5e78. Cannot
> map checkpoint/savepoint state for operator d14a399e92154660771a806b90515d4c
> to the new program, because the operator is not available in the new
> program.}}
> {quote}
> After investigation, the problematic operator corresponds to a
> {{ChangelogNormalize}} operator, that I do not explicitly create. It is
> generated because I use [{{tableEnv.fromChangelogStream(stream, schema,
> ChangelogMode.upsert())}}|https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.html#fromChangelogStream-org.apache.flink.streaming.api.datastream.DataStream-org.apache.flink.table.api.Schema-org.apache.flink.table.connector.ChangelogMode-]
> (the upsert mode is important, other modes do not fail). The table created
> is passed to an SQL query using the SQL API, which generates something like:
> {quote}{{ChangelogNormalize[8] -> Calc[9] -> TableToDataSteam ->
> [my_sql_transformation] -> [my_sink]}}
> {quote}
> In previous versions of Flink it seems this operator was always given the
> same uid so the state could match when starting from the savepoint. In Flink
> 1.15.1, I see that a different uid is generated every time. I could not find
> a reliable way to set that uid manually. The only way I found was by going
> backwards from the transformation:
> {quote}{{dataStream.getTransformation().getInputs().get(0).getInputs().get(0).getInputs().get(0).setUid("the_user_defined_id");}}
> {quote}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)