[ 
https://issues.apache.org/jira/browse/FLINK-28861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17577337#comment-17577337
 ] 

Timo Walther commented on FLINK-28861:
--------------------------------------

Let me summarize the issue: The UID generation using a static AtomicInteger as 
counter makes sense for plan compilation. There, the generated UID will be 
immediately persisted in JSON and thus remains static.

However, it should not be enabled by default for regular Table API jobs that 
potentially connect to DataStream API. The current counter approach for UID 
generation causes issues when the same JVM translates multiple SQL/Table API 
pipelines. It is not easily possible to ensure uniqueness as one DataStream API 
job could potentially consist of multiple SQL pipelines. The previous approach 
in 1.14 (viewing the entire StreamGraph and assigning the UIDs in a last step 
at the end) makes more sense if the pipeline is not constructed from a compiled 
plan.

I would suggest to revert the change made in 1.15. Not all pipelines are 
affected by this bug. It works nicely in containerized environments that start 
a new JVM per job. I suggest to introduce a new config option:
{{table.exec.uid-generation}} of type enum with values {{PLAN_ONLY}} (default) 
and {{ALWAYS}} (for 1.15.0, 1.15.1 behavior and expert users that know what 
they are doing). {{PLAN_ONLY}} means that we use the behavior of 
{{table.exec.legacy-transformation-uids}} if the translation does not happen 
for a compiled plan.

> Cannot resume from savepoint when using fromChangelogStream in upsert mode
> --------------------------------------------------------------------------
>
>                 Key: FLINK-28861
>                 URL: https://issues.apache.org/jira/browse/FLINK-28861
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.15.1
>            Reporter: Colin Smetz
>            Assignee: Timo Walther
>            Priority: Critical
>
> I want to use the savepoint mechanism to move existing jobs from one version 
> of Flink to another, by:
>  # Stopping a job with a savepoint
>  # Creating a new job from the savepoint, on the new version.
> In Flink 1.15.1, it fails, even when going from 1.15.1 to 1.15.1. I get this 
> error, meaning that it could not map the state from the previous job to the 
> new one because of one operator:
> {quote}{{Failed to rollback to checkpoint/savepoint 
> hdfs://hdfs-name:8020/flink-savepoints/savepoint-046708-238e921f5e78. Cannot 
> map checkpoint/savepoint state for operator d14a399e92154660771a806b90515d4c 
> to the new program, because the operator is not available in the new 
> program.}}
> {quote}
> After investigation, the problematic operator corresponds to a 
> {{ChangelogNormalize}} operator, that I do not explicitly create. It is 
> generated because I use [{{tableEnv.fromChangelogStream(stream, schema, 
> ChangelogMode.upsert())}}|https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.html#fromChangelogStream-org.apache.flink.streaming.api.datastream.DataStream-org.apache.flink.table.api.Schema-org.apache.flink.table.connector.ChangelogMode-]
>  (the upsert mode is important, other modes do not fail). The table created 
> is passed to an SQL query using the SQL API, which generates something like:
> {quote}{{ChangelogNormalize[8] -> Calc[9] -> TableToDataSteam -> 
> [my_sql_transformation] -> [my_sink]}}
> {quote}
> In previous versions of Flink it seems this operator was always given the 
> same uid so the state could match when starting from the savepoint. In Flink 
> 1.15.1, I see that a different uid is generated every time. I could not find 
> a reliable way to set that uid manually. The only way I found was by going 
> backwards from the transformation:
> {quote}{{dataStream.getTransformation().getInputs().get(0).getInputs().get(0).getInputs().get(0).setUid("the_user_defined_id");}}
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to