[
https://issues.apache.org/jira/browse/FLINK-31143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matthias Pohl updated FLINK-31143:
----------------------------------
Description:
I tried to run the following case:
{code:java}
public static void main(String[] args) throws Exception {
final String createTableQuery =
"CREATE TABLE left_table (a int, c varchar) "
+ "WITH ("
+ " 'connector' = 'datagen', "
+ " 'rows-per-second' = '1', "
+ " 'fields.a.kind' = 'sequence', "
+ " 'fields.a.start' = '0', "
+ " 'fields.a.end' = '100000'"
+ ");";
final String selectQuery = "SELECT * FROM left_table;";
final Configuration initialConfig = new Configuration();
initialConfig.set(CoreOptions.DEFAULT_PARALLELISM, 1);
final EnvironmentSettings initialSettings =
EnvironmentSettings.newInstance()
.inStreamingMode()
.withConfiguration(initialConfig)
.build();
final TableEnvironment initialTableEnv =
TableEnvironment.create(initialSettings);
initialTableEnv.executeSql(createTableQuery);
final TableResult tableResult =
initialTableEnv.sqlQuery(selectQuery).execute();
tableResult.await();
final String savepointPath;
try (CloseableIterator<Row> tableResultIterator =
tableResult.collect()) {
// consume two results
System.out.println(tableResultIterator.next());
System.out.println(tableResultIterator.next());
final JobClient jobClient =
tableResult.getJobClient().orElseThrow(IllegalStateException::new);
final File savepointDirectory = Files.createTempDir();
savepointPath =
jobClient
.stopWithSavepoint(
true,
savepointDirectory.getAbsolutePath(),
SavepointFormatType.CANONICAL)
.get();
}
final SavepointRestoreSettings savepointRestoreSettings =
SavepointRestoreSettings.forPath(savepointPath, true);
final Configuration restartConfig = new Configuration(initialConfig);
SavepointRestoreSettings.toConfiguration(savepointRestoreSettings,
restartConfig);
final EnvironmentSettings restartSettings =
EnvironmentSettings.newInstance()
.inStreamingMode()
.withConfiguration(restartConfig)
.build();
final TableEnvironment restartTableEnv =
TableEnvironment.create(restartSettings);
restartTableEnv.executeSql(createTableQuery);
restartTableEnv.sqlQuery(selectQuery).execute().print();
}
{code}
h3. Expected behavior
The job continues omitting the inital two records and starts printing results
from 2 onwards.
h3. Observed behavior
No results are printed. The logs show that an invalid request was handled:
{code}
org.apache.flink.streaming.api.operators.collect.CollectSinkFunction [] -
Invalid request. Received version = b497a74f-e85c-404b-8df3-1b4b1a0c2de1,
offset = 0, while expected version = b497a74f-e85c-404b-8df3-1b4b1a0c2de1,
offset = 1
{code}
It looks like the right offset is not picked up from the savepoint (see
[CollectSinkFunction:411|https://github.com/apache/flink/blob/5ae8cb0503449b07f76d0ab621c3e81734496b26/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java#L411]).
was:
I tried to run the following case:
{code:java}
public static void main(String[] args) throws Exception {
final String createTableQuery =
"CREATE TABLE left_table (a int, c varchar) "
+ "WITH ("
+ " 'connector' = 'datagen', "
+ " 'rows-per-second' = '1', "
+ " 'fields.a.kind' = 'sequence', "
+ " 'fields.a.start' = '0', "
+ " 'fields.a.end' = '100000'"
+ ");";
final String selectQuery = "SELECT * FROM left_table;";
final Configuration initialConfig = new Configuration();
initialConfig.set(CoreOptions.DEFAULT_PARALLELISM, 1);
final EnvironmentSettings initialSettings =
EnvironmentSettings.newInstance()
.inStreamingMode()
.withConfiguration(initialConfig)
.build();
final TableEnvironment initialTableEnv =
TableEnvironment.create(initialSettings);
initialTableEnv.executeSql(createTableQuery);
final TableResult tableResult =
initialTableEnv.sqlQuery(selectQuery).execute();
tableResult.await();
final String savepointPath;
try (CloseableIterator<Row> tableResultIterator =
tableResult.collect()) {
// consume two results
System.out.println(tableResultIterator.next());
System.out.println(tableResultIterator.next());
final JobClient jobClient =
tableResult.getJobClient().orElseThrow(IllegalStateException::new);
final File savepointDirectory = Files.createTempDir();
savepointPath =
jobClient
.stopWithSavepoint(
true,
savepointDirectory.getAbsolutePath(),
SavepointFormatType.CANONICAL)
.get();
}
final SavepointRestoreSettings savepointRestoreSettings =
SavepointRestoreSettings.forPath(savepointPath, true);
final Configuration restartConfig = new Configuration(initialConfig);
SavepointRestoreSettings.toConfiguration(savepointRestoreSettings,
restartConfig);
final EnvironmentSettings restartSettings =
EnvironmentSettings.newInstance()
.inStreamingMode()
.withConfiguration(restartConfig)
.build();
final TableEnvironment restartTableEnv =
TableEnvironment.create(restartSettings);
restartTableEnv.executeSql(createTableQuery);
restartTableEnv.sqlQuery(selectQuery).execute().print();
}
{code}
Expected behavior: The job continues omitting the inital two records and starts
printing results from 2 onwards.
Observed behavior:
No results are printed. The logs show that an invalid request was handled:
{code}
org.apache.flink.streaming.api.operators.collect.CollectSinkFunction [] -
Invalid request. Received version = b497a74f-e85c-404b-8df3-1b4b1a0c2de1,
offset = 0, while expected version = b497a74f-e85c-404b-8df3-1b4b1a0c2de1,
offset = 1
{code}
It looks like the right offset is not picked up from the savepoint (see
[CollectSinkFunction:411|https://github.com/apache/flink/blob/5ae8cb0503449b07f76d0ab621c3e81734496b26/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java#L411]).
> Invalid request: offset doesn't match when restarting from a savepoint
> ----------------------------------------------------------------------
>
> Key: FLINK-31143
> URL: https://issues.apache.org/jira/browse/FLINK-31143
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.17.0, 1.15.3, 1.16.1
> Reporter: Matthias Pohl
> Priority: Critical
>
> I tried to run the following case:
> {code:java}
> public static void main(String[] args) throws Exception {
> final String createTableQuery =
> "CREATE TABLE left_table (a int, c varchar) "
> + "WITH ("
> + " 'connector' = 'datagen', "
> + " 'rows-per-second' = '1', "
> + " 'fields.a.kind' = 'sequence', "
> + " 'fields.a.start' = '0', "
> + " 'fields.a.end' = '100000'"
> + ");";
> final String selectQuery = "SELECT * FROM left_table;";
> final Configuration initialConfig = new Configuration();
> initialConfig.set(CoreOptions.DEFAULT_PARALLELISM, 1);
> final EnvironmentSettings initialSettings =
> EnvironmentSettings.newInstance()
> .inStreamingMode()
> .withConfiguration(initialConfig)
> .build();
> final TableEnvironment initialTableEnv =
> TableEnvironment.create(initialSettings);
> initialTableEnv.executeSql(createTableQuery);
> final TableResult tableResult =
> initialTableEnv.sqlQuery(selectQuery).execute();
> tableResult.await();
> final String savepointPath;
> try (CloseableIterator<Row> tableResultIterator =
> tableResult.collect()) {
> // consume two results
> System.out.println(tableResultIterator.next());
> System.out.println(tableResultIterator.next());
> final JobClient jobClient =
>
> tableResult.getJobClient().orElseThrow(IllegalStateException::new);
> final File savepointDirectory = Files.createTempDir();
> savepointPath =
> jobClient
> .stopWithSavepoint(
> true,
> savepointDirectory.getAbsolutePath(),
> SavepointFormatType.CANONICAL)
> .get();
> }
> final SavepointRestoreSettings savepointRestoreSettings =
> SavepointRestoreSettings.forPath(savepointPath, true);
> final Configuration restartConfig = new Configuration(initialConfig);
> SavepointRestoreSettings.toConfiguration(savepointRestoreSettings,
> restartConfig);
> final EnvironmentSettings restartSettings =
> EnvironmentSettings.newInstance()
> .inStreamingMode()
> .withConfiguration(restartConfig)
> .build();
> final TableEnvironment restartTableEnv =
> TableEnvironment.create(restartSettings);
> restartTableEnv.executeSql(createTableQuery);
> restartTableEnv.sqlQuery(selectQuery).execute().print();
> }
> {code}
> h3. Expected behavior
> The job continues omitting the inital two records and starts printing results
> from 2 onwards.
> h3. Observed behavior
> No results are printed. The logs show that an invalid request was handled:
> {code}
> org.apache.flink.streaming.api.operators.collect.CollectSinkFunction [] -
> Invalid request. Received version = b497a74f-e85c-404b-8df3-1b4b1a0c2de1,
> offset = 0, while expected version = b497a74f-e85c-404b-8df3-1b4b1a0c2de1,
> offset = 1
> {code}
> It looks like the right offset is not picked up from the savepoint (see
> [CollectSinkFunction:411|https://github.com/apache/flink/blob/5ae8cb0503449b07f76d0ab621c3e81734496b26/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java#L411]).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)