[ 
https://issues.apache.org/jira/browse/FLINK-28622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17569380#comment-17569380
 ] 

Martijn Visser commented on FLINK-28622:
----------------------------------------

[~renqs] Any idea why this would fail? I'm under the impression that if the 
Flink version, the SQL statement and nothing else changes, savepoints should be 
supported?

> Can't restore a flink job that uses Table API and Kafka connector with 
> savepoint
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-28622
>                 URL: https://issues.apache.org/jira/browse/FLINK-28622
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.15.0
>            Reporter: Nathan
>            Priority: Critical
>
> I canceled a flink job with a savepoint, then tried to restore the job with 
> the savepoint (just using the same jar file) but it said it cannot map 
> savepoint state. I was just using the same jar file so I think the execution 
> plan and generated operator ID should be the same? (Flink version has not 
> been changed)
>  
> Related errors:
> {code:java}
> used by: java.util.concurrent.CompletionException: 
> java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint 
> file:/root/flink-savepoints/savepoint-5f285c-c2749410db07. Cannot map 
> checkpoint/savepoint state for operator dd5fc1f28f42d777f818e2e8ea18c331 to 
> the new program, because the operator is not available in the new program. If 
> you want to allow to skip this, you can set the --allowNonRestoredState 
> option on the CLI.
> used by: java.lang.IllegalStateException: Failed to rollback to 
> checkpoint/savepoint 
> file:/root/flink-savepoints/savepoint-5f285c-c2749410db07. Cannot map 
> checkpoint/savepoint state for operator dd5fc1f28f42d777f818e2e8ea18c331 to 
> the new program, because the operator is not available in the new program. If 
> you want to allow to skip this, you can set the --allowNonRestoredState 
> option on the CLI. {code}
> My code:
> {code:java}
> public final class FlinkJob {
>     public static void main(String[] args) {
>         final String JOB_NAME = "FlinkJob";
>         final EnvironmentSettings settings = 
> EnvironmentSettings.inStreamingMode();
>         final TableEnvironment tEnv = TableEnvironment.create(settings);
>         tEnv.getConfig().set("pipeline.name", JOB_NAME);
>         tEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));
>         tEnv.executeSql("CREATE TEMPORARY TABLE ApiLog (" +
>                 "  `_timestamp` TIMESTAMP(3) METADATA FROM 'timestamp' 
> VIRTUAL," +
>                 "  `_partition` INT METADATA FROM 'partition' VIRTUAL," +
>                 "  `_offset` BIGINT METADATA FROM 'offset' VIRTUAL," +
>                 "  `Data` STRING," +
>                 "  `Action` STRING," +
>                 "  `ProduceDateTime` TIMESTAMP_LTZ(6)," +
>                 "  `OffSet` INT" +
>                 ") WITH (" +
>                 "  'connector' = 'kafka'," +
>                 "  'topic' = 'api.log'," +
>                 "  'properties.group.id' = 'flink'," +
>                 "  'properties.bootstrap.servers' = '<mykafkahost...>'," +
>                 "  'format' = 'json'," +
>                 "  'json.timestamp-format.standard' = 'ISO-8601'" +
>                 ")");
>         tEnv.executeSql("CREATE TABLE print_table (" +
>                 " `_timestamp` TIMESTAMP(3)," +
>                 " `_partition` INT," +
>                 " `_offset` BIGINT," +
>                 " `Data` STRING," +
>                 " `Action` STRING," +
>                 " `ProduceDateTime` TIMESTAMP(6)," +
>                 " `OffSet` INT" +
>                 ") WITH ('connector' = 'print')");
>         tEnv.executeSql("INSERT INTO print_table" +
>                 " SELECT * FROM ApiLog");
>     }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to