Hi Guenther,

sorry for overlooking your colleague's email.

I think the answer to your problem is twofold. The underlying problem is
that your query seems to use `RowData` as a key for some keyed operation.
Since changing the key format might entail that keys need to be differently
partitioned, Flink does not support changing the key format. That is why
Flink fails also if the key format is compatible after migration. There is
a small warning about this on the state evolution page [1].

The other part of the answer is that Flink does not support strict
backwards compatibility for SQL queries if I am not mistaken (please chime
in if this is no longer correct @Timo Walther <twal...@apache.org> and
@j...@apache.org <j...@apache.org>). The problem is that queries might
result in different execution plans after a version upgrade which then can
not be mapped to the old state. Admittedly, in this case, it should have
been possible but changing the `RowData` type which is used as a key breaks
backwards compatibility. A bit confusing is that FLINK-16998 explicitly
states that this change is not breaking backwards compatibility.

What you could try to use as a workaround is Flink's state processor API
[2] which allows you to rewrite savepoints.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

Cheers,
Till

On Wed, Dec 30, 2020 at 3:30 AM Guenther Starnberger <fl...@sysfrog.org>
wrote:

> Hello,
>
> We're currently running into an issue upgrading the state of an
> application to Flink 1.11 and I think this could be caused by a
> potential backwards incompatibility that was introduced with Flink
> 1.11. A colleague of mine recently posted about it on the users list
> (without a response), but I'd like to bring this up here on the dev
> list in order to figure out if that incompatibility is intended
> behavior and/or a known issue.
>
> We're seeing that issue when trying to load the RocksDB state from
> Flink 1.9 into Flink 1.11 with an application that uses the Flink
> table environment. Immediately after startup
> RocksDBFullRestoreOperation#restoreKVStateMetaData raises an "The new
> key serializer must be compatible" exception.
>
> The reason for that seems to be that FLINK-16998 changed the row
> serialization format by adding a new Row#getKind field. There's a
> legacy mode of the row serializer but that's only used for reading the
> existing snapshot. As a consequence
> RowSerializerSnapshot#resolveOuterSchemaCompatibility will always
> return COMPATIBLE_AFTER_MIGRATION when reading a Flink 1.9 savepoint
> with Flink 1.11.
>
> The problem is that AbstractRocksDBRestoreOperation#readMetaData
> treats "compatible after migration" the same as "incompatible" and
> throws a "The new key serializer must be compatible." exception if it
> encounters that result.
>
> Is it expected that the introduction of Row#getKind breaks existing
> older state or is that a bug? So far I only reproduced this issue in a
> somewhat more complex codebase, but in case this is an unknown issue
> or not the intended behavior I can try to provide a small testcase (to
> rule out that anything in our own code triggers that issue).
>
> Example of a query that triggers that issue:
> https://gist.github.com/gstarnberger/f19a30df179c72a622490cbb041adb21
>
> Full stacktrace:
> https://gist.github.com/gstarnberger/336d94e723b3ec599d09e17dd7d0ee00
>
> - Guenther
>

Reply via email to