[
https://issues.apache.org/jira/browse/FLINK-28622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17569380#comment-17569380
]
Martijn Visser commented on FLINK-28622:
----------------------------------------
[~renqs] Any idea why this would fail? I'm under the impression that if the
Flink version, the SQL statement and nothing else changes, savepoints should be
supported?
> Can't restore a flink job that uses Table API and Kafka connector with
> savepoint
> --------------------------------------------------------------------------------
>
> Key: FLINK-28622
> URL: https://issues.apache.org/jira/browse/FLINK-28622
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Affects Versions: 1.15.0
> Reporter: Nathan
> Priority: Critical
>
> I canceled a flink job with a savepoint, then tried to restore the job with
> the savepoint (just using the same jar file) but it said it cannot map
> savepoint state. I was just using the same jar file so I think the execution
> plan and generated operator ID should be the same? (Flink version has not
> been changed)
>
> Related errors:
> {code:java}
> used by: java.util.concurrent.CompletionException:
> java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint
> file:/root/flink-savepoints/savepoint-5f285c-c2749410db07. Cannot map
> checkpoint/savepoint state for operator dd5fc1f28f42d777f818e2e8ea18c331 to
> the new program, because the operator is not available in the new program. If
> you want to allow to skip this, you can set the --allowNonRestoredState
> option on the CLI.
> used by: java.lang.IllegalStateException: Failed to rollback to
> checkpoint/savepoint
> file:/root/flink-savepoints/savepoint-5f285c-c2749410db07. Cannot map
> checkpoint/savepoint state for operator dd5fc1f28f42d777f818e2e8ea18c331 to
> the new program, because the operator is not available in the new program. If
> you want to allow to skip this, you can set the --allowNonRestoredState
> option on the CLI. {code}
> My code:
> {code:java}
> public final class FlinkJob {
> public static void main(String[] args) {
> final String JOB_NAME = "FlinkJob";
> final EnvironmentSettings settings =
> EnvironmentSettings.inStreamingMode();
> final TableEnvironment tEnv = TableEnvironment.create(settings);
> tEnv.getConfig().set("pipeline.name", JOB_NAME);
> tEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));
> tEnv.executeSql("CREATE TEMPORARY TABLE ApiLog (" +
> " `_timestamp` TIMESTAMP(3) METADATA FROM 'timestamp'
> VIRTUAL," +
> " `_partition` INT METADATA FROM 'partition' VIRTUAL," +
> " `_offset` BIGINT METADATA FROM 'offset' VIRTUAL," +
> " `Data` STRING," +
> " `Action` STRING," +
> " `ProduceDateTime` TIMESTAMP_LTZ(6)," +
> " `OffSet` INT" +
> ") WITH (" +
> " 'connector' = 'kafka'," +
> " 'topic' = 'api.log'," +
> " 'properties.group.id' = 'flink'," +
> " 'properties.bootstrap.servers' = '<mykafkahost...>'," +
> " 'format' = 'json'," +
> " 'json.timestamp-format.standard' = 'ISO-8601'" +
> ")");
> tEnv.executeSql("CREATE TABLE print_table (" +
> " `_timestamp` TIMESTAMP(3)," +
> " `_partition` INT," +
> " `_offset` BIGINT," +
> " `Data` STRING," +
> " `Action` STRING," +
> " `ProduceDateTime` TIMESTAMP(6)," +
> " `OffSet` INT" +
> ") WITH ('connector' = 'print')");
> tEnv.executeSql("INSERT INTO print_table" +
> " SELECT * FROM ApiLog");
> }
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)