[ 
https://issues.apache.org/jira/browse/FLINK-23953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-23953:
-----------------------------------
    Labels: Avro AvroSchema pull-request-available  (was: Avro AvroSchema)

> AvroSerializerSnapshot causes state migration even though the old and the new 
> schemas are compatible
> ----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-23953
>                 URL: https://issues.apache.org/jira/browse/FLINK-23953
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Type Serialization System
>    Affects Versions: 1.13.2
>            Reporter: Iaroslav Zeigerman
>            Priority: Major
>              Labels: Avro, AvroSchema, pull-request-available
>
> The problematic code is located 
> [here|https://github.com/apache/flink/blob/release-1.13.2/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshot.java#L194].
>  Returning COMPATIBLE_AFTER_MIGRATION seems completely unnecessary and causes 
> issues during schema evolution. The COMPATIBLE_AS_IS status should be 
> returned instead. Even the comment right above the shared snippet suggests 
> that:
> {noformat}
>                     // The new serializer would be able to read data 
> persisted with *this*
>                     // serializer, therefore no migration
>                     // is required.
> {noformat}
> This issue leads to Flink failures in a scenario when a new optional field is 
> added to a schema. The following happens in this case:
> # Records in state get deserialized successfully using the old serializer 
> (with old schema)
> # The schema changes leads to state migration due to the code path that I 
> shared above.
> # RocksDBKeyedStateBackend attempts to serialize a record with the old schema 
> using the new schema (as part of migration).
> # The latter operation fails for obvious reasons (incompatibility of record 
> indexes between the old and the new schemas).
> The failure occurs with a stack trace which looks something like this:
> {noformat}
> Caused by: org.apache.flink.util.StateMigrationException: Error while trying 
> to migrate RocksDB list state.
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBListState.migrateSerializedValue(RocksDBListState.java:269)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:630)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:559)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:509)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:670)
>       at 
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>       at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
>       at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
>       at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
>       at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
>       at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71)
>       ... 18 more
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 10
>       at org.apache.avro.generic.GenericData$Record.get(GenericData.java:261)
>       at org.apache.avro.generic.GenericData.getField(GenericData.java:825)
>       at org.apache.avro.generic.GenericData.getField(GenericData.java:844)
>       at 
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:204)
>       at 
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
>       at 
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
>       at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
>       at 
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144)
>       at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
>       at 
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:206)
>       at 
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
>       at 
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
>       at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
>       at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
>       at 
> org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:186)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBListState.migrateSerializedValue(RocksDBListState.java:263)
>       ... 28 more
> {noformat}
> If Flink skipped the migration in this case and just proceeded with using the 
> new serializer for deserialization of old records no such issue would've 
> occurred.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to