[
https://issues.apache.org/jira/browse/FLINK-28622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872855#comment-17872855
]
Ammu Parvathy commented on FLINK-28622:
---------------------------------------
[~norris3n] Analysing through the issue, this would be a right test case to
start with:
[https://github.com/apache/flink/blob/master/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java#L115].
This case is specifically dealing with scenarios where there is a change in
the coordinator state between the save point and the restarted job. Considering
that the Flink version, the SQL statement and nothing else is changed, one of
the possible issue could be due to change in your source kafka connector
configuration.
> Can't restore a flink job that uses Table API and Kafka connector with
> savepoint
> --------------------------------------------------------------------------------
>
> Key: FLINK-28622
> URL: https://issues.apache.org/jira/browse/FLINK-28622
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Affects Versions: 1.15.0
> Reporter: Nathan
> Priority: Critical
>
> I canceled a flink job with a savepoint, then tried to restore the job with
> the savepoint (just using the same jar file) but it said it cannot map
> savepoint state. I was just using the same jar file so I think the execution
> plan and generated operator ID should be the same? (Flink version has not
> been changed)
>
> Related errors:
> {code:java}
> used by: java.util.concurrent.CompletionException:
> java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint
> file:/root/flink-savepoints/savepoint-5f285c-c2749410db07. Cannot map
> checkpoint/savepoint state for operator dd5fc1f28f42d777f818e2e8ea18c331 to
> the new program, because the operator is not available in the new program. If
> you want to allow to skip this, you can set the --allowNonRestoredState
> option on the CLI.
> used by: java.lang.IllegalStateException: Failed to rollback to
> checkpoint/savepoint
> file:/root/flink-savepoints/savepoint-5f285c-c2749410db07. Cannot map
> checkpoint/savepoint state for operator dd5fc1f28f42d777f818e2e8ea18c331 to
> the new program, because the operator is not available in the new program. If
> you want to allow to skip this, you can set the --allowNonRestoredState
> option on the CLI. {code}
> My code:
> {code:java}
> public final class FlinkJob {
> public static void main(String[] args) {
> final String JOB_NAME = "FlinkJob";
> final EnvironmentSettings settings =
> EnvironmentSettings.inStreamingMode();
> final TableEnvironment tEnv = TableEnvironment.create(settings);
> tEnv.getConfig().set("pipeline.name", JOB_NAME);
> tEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));
> tEnv.executeSql("CREATE TEMPORARY TABLE ApiLog (" +
> " `_timestamp` TIMESTAMP(3) METADATA FROM 'timestamp'
> VIRTUAL," +
> " `_partition` INT METADATA FROM 'partition' VIRTUAL," +
> " `_offset` BIGINT METADATA FROM 'offset' VIRTUAL," +
> " `Data` STRING," +
> " `Action` STRING," +
> " `ProduceDateTime` TIMESTAMP_LTZ(6)," +
> " `OffSet` INT" +
> ") WITH (" +
> " 'connector' = 'kafka'," +
> " 'topic' = 'api.log'," +
> " 'properties.group.id' = 'flink'," +
> " 'properties.bootstrap.servers' = '<mykafkahost...>'," +
> " 'format' = 'json'," +
> " 'json.timestamp-format.standard' = 'ISO-8601'" +
> ")");
> tEnv.executeSql("CREATE TABLE print_table (" +
> " `_timestamp` TIMESTAMP(3)," +
> " `_partition` INT," +
> " `_offset` BIGINT," +
> " `Data` STRING," +
> " `Action` STRING," +
> " `ProduceDateTime` TIMESTAMP(6)," +
> " `OffSet` INT" +
> ") WITH ('connector' = 'print')");
> tEnv.executeSql("INSERT INTO print_table" +
> " SELECT * FROM ApiLog");
> }
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)