[
https://issues.apache.org/jira/browse/FLINK-31143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17691249#comment-17691249
]
Matthias Pohl commented on FLINK-31143:
---------------------------------------
I didn't run this example in older Flink versions but had initially tested a
more generic example manually in 1.15.3 and 1.16.1. I saw similar behavior with
no results being printed. Therefore, I added the corresponding versions to this
Jira issue.
> Invalid request: offset doesn't match when restarting from a savepoint
> ----------------------------------------------------------------------
>
> Key: FLINK-31143
> URL: https://issues.apache.org/jira/browse/FLINK-31143
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.17.0, 1.15.3, 1.16.1
> Reporter: Matthias Pohl
> Priority: Critical
>
> I tried to run the following case:
> {code:java}
> public static void main(String[] args) throws Exception {
> final String createTableQuery =
> "CREATE TABLE left_table (a int, c varchar) "
> + "WITH ("
> + " 'connector' = 'datagen', "
> + " 'rows-per-second' = '1', "
> + " 'fields.a.kind' = 'sequence', "
> + " 'fields.a.start' = '0', "
> + " 'fields.a.end' = '100000'"
> + ");";
> final String selectQuery = "SELECT * FROM left_table;";
> final Configuration initialConfig = new Configuration();
> initialConfig.set(CoreOptions.DEFAULT_PARALLELISM, 1);
> final EnvironmentSettings initialSettings =
> EnvironmentSettings.newInstance()
> .inStreamingMode()
> .withConfiguration(initialConfig)
> .build();
> final TableEnvironment initialTableEnv =
> TableEnvironment.create(initialSettings);
> initialTableEnv.executeSql(createTableQuery);
> final TableResult tableResult =
> initialTableEnv.sqlQuery(selectQuery).execute();
> tableResult.await();
> final String savepointPath;
> try (CloseableIterator<Row> tableResultIterator =
> tableResult.collect()) {
> // consume two results
> System.out.println(tableResultIterator.next());
> System.out.println(tableResultIterator.next());
> final JobClient jobClient =
>
> tableResult.getJobClient().orElseThrow(IllegalStateException::new);
> final File savepointDirectory = Files.createTempDir();
> savepointPath =
> jobClient
> .stopWithSavepoint(
> true,
> savepointDirectory.getAbsolutePath(),
> SavepointFormatType.CANONICAL)
> .get();
> }
> final SavepointRestoreSettings savepointRestoreSettings =
> SavepointRestoreSettings.forPath(savepointPath, true);
> final Configuration restartConfig = new Configuration(initialConfig);
> SavepointRestoreSettings.toConfiguration(savepointRestoreSettings,
> restartConfig);
> final EnvironmentSettings restartSettings =
> EnvironmentSettings.newInstance()
> .inStreamingMode()
> .withConfiguration(restartConfig)
> .build();
> final TableEnvironment restartTableEnv =
> TableEnvironment.create(restartSettings);
> restartTableEnv.executeSql(createTableQuery);
> restartTableEnv.sqlQuery(selectQuery).execute().print();
> }
> {code}
> h3. Expected behavior
> The job continues omitting the inital two records and starts printing results
> from 2 onwards.
> h3. Observed behavior
> No results are printed. The logs show that an invalid request was handled:
> {code}
> org.apache.flink.streaming.api.operators.collect.CollectSinkFunction [] -
> Invalid request. Received version = b497a74f-e85c-404b-8df3-1b4b1a0c2de1,
> offset = 0, while expected version = b497a74f-e85c-404b-8df3-1b4b1a0c2de1,
> offset = 1
> {code}
> It looks like the right offset is not picked up from the savepoint (see
> [CollectSinkFunction:411|https://github.com/apache/flink/blob/5ae8cb0503449b07f76d0ab621c3e81734496b26/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java#L411]).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)