[
https://issues.apache.org/jira/browse/FLINK-23953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Iaroslav Zeigerman updated FLINK-23953:
---------------------------------------
Description:
The problematic code is located
[here|https://github.com/apache/flink/blob/release-1.13.2/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshot.java#L194].
Returning COMPATIBLE_AFTER_MIGRATION seems completely unnecessary and causes
issues during schema evolution. The COMPATIBLE_AS_IS status should be returned
instead. Even the comment right above the shared snippet suggests that:
{noformat}
// The new serializer would be able to read data persisted
with *this*
// serializer, therefore no migration
// is required.
{noformat}
This issue leads to Flink failures in a scenario when a new optional field is
added to a schema. The following happens in this case:
# Records in state get deserialized successfully using the old serializer (with
old schema)
# The schema changes leads to state migration due to the code path that I
shared above.
# RocksDBKeyedStateBackend attempts to serialize a record with the old schema
using the new schema.
# The latter operation fails for obvious reasons (incompatibility of record
indexes between the old and the new schemas).
The failure occurs with a stack trace which looks something like this:
{noformat}
Caused by: org.apache.flink.util.StateMigrationException: Error while trying to
migrate RocksDB list state.
at
org.apache.flink.contrib.streaming.state.RocksDBListState.migrateSerializedValue(RocksDBListState.java:269)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:630)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:559)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:509)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:670)
at
org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71)
... 18 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: 10
at org.apache.avro.generic.GenericData$Record.get(GenericData.java:261)
at org.apache.avro.generic.GenericData.getField(GenericData.java:825)
at org.apache.avro.generic.GenericData.getField(GenericData.java:844)
at
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:204)
at
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
at
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
at
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
at
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:206)
at
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
at
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
at
org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:186)
at
org.apache.flink.contrib.streaming.state.RocksDBListState.migrateSerializedValue(RocksDBListState.java:263)
... 28 more
{noformat}
If Flink skipped the migration in this case and just proceeded with using the
new schema for deserialization of old records no such issue would've occurred.
was:
The problematic code is located
[here|https://github.com/apache/flink/blob/release-1.13.2/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshot.java#L194].
Returning COMPATIBLE_AFTER_MIGRATION seems completely unnecessary and causes
issues during schema evolution. The COMPATIBLE_AS_IS status should be returned
instead. Even the comment right above the shared snippet suggests that:
{noformat}
// The new serializer would be able to read data persisted
with *this*
// serializer, therefore no migration
// is required.
{noformat}
This issue leads to Flink failures in a scenario when a new optional field is
added to a schema. The following happens in this case:
# Records in state get deserialized successfully using the old serializer (with
old schema)
# The schema changes leads to state migration due to the code path that I
shared above.
# RocksDBKeyedStateBackend attempts to serialize a record with the old schema
using the new schema.
# The latter operation fails for obvious reasons (incompatibility of record
indexes between the old and the new schemas).
The failure occurs with a stack trace which looks something like this:
{noformat}
Caused by: org.apache.flink.util.StateMigrationException: Error while trying to
migrate RocksDB list state.
at
org.apache.flink.contrib.streaming.state.RocksDBListState.migrateSerializedValue(RocksDBListState.java:269)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:630)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:559)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:509)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:670)
at
org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71)
... 18 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: 10
at org.apache.avro.generic.GenericData$Record.get(GenericData.java:261)
at org.apache.avro.generic.GenericData.getField(GenericData.java:825)
at org.apache.avro.generic.GenericData.getField(GenericData.java:844)
at
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:204)
at
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
at
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
at
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
at
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:206)
at
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
at
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
at
org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:186)
at
org.apache.flink.contrib.streaming.state.RocksDBListState.migrateSerializedValue(RocksDBListState.java:263)
... 28 more
{noformat}
If Flink skipped the migration in this case and just went ahead using the new
schema for deserialization of old records no such issue would've occurred.
> AvroSerializerSnapshot causes state migration even though the old and the new
> schemas are compatible
> ----------------------------------------------------------------------------------------------------
>
> Key: FLINK-23953
> URL: https://issues.apache.org/jira/browse/FLINK-23953
> Project: Flink
> Issue Type: Bug
> Components: API / Type Serialization System
> Affects Versions: 1.13.2
> Reporter: Iaroslav Zeigerman
> Priority: Major
>
> The problematic code is located
> [here|https://github.com/apache/flink/blob/release-1.13.2/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshot.java#L194].
> Returning COMPATIBLE_AFTER_MIGRATION seems completely unnecessary and causes
> issues during schema evolution. The COMPATIBLE_AS_IS status should be
> returned instead. Even the comment right above the shared snippet suggests
> that:
> {noformat}
> // The new serializer would be able to read data
> persisted with *this*
> // serializer, therefore no migration
> // is required.
> {noformat}
> This issue leads to Flink failures in a scenario when a new optional field is
> added to a schema. The following happens in this case:
> # Records in state get deserialized successfully using the old serializer
> (with old schema)
> # The schema changes leads to state migration due to the code path that I
> shared above.
> # RocksDBKeyedStateBackend attempts to serialize a record with the old schema
> using the new schema.
> # The latter operation fails for obvious reasons (incompatibility of record
> indexes between the old and the new schemas).
> The failure occurs with a stack trace which looks something like this:
> {noformat}
> Caused by: org.apache.flink.util.StateMigrationException: Error while trying
> to migrate RocksDB list state.
> at
> org.apache.flink.contrib.streaming.state.RocksDBListState.migrateSerializedValue(RocksDBListState.java:269)
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:630)
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:559)
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:509)
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:670)
> at
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
> at
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
> at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
> at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71)
> ... 18 more
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 10
> at org.apache.avro.generic.GenericData$Record.get(GenericData.java:261)
> at org.apache.avro.generic.GenericData.getField(GenericData.java:825)
> at org.apache.avro.generic.GenericData.getField(GenericData.java:844)
> at
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:204)
> at
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
> at
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
> at
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
> at
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:206)
> at
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
> at
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
> at
> org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:186)
> at
> org.apache.flink.contrib.streaming.state.RocksDBListState.migrateSerializedValue(RocksDBListState.java:263)
> ... 28 more
> {noformat}
> If Flink skipped the migration in this case and just proceeded with using the
> new schema for deserialization of old records no such issue would've occurred.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)