[ 
https://issues.apache.org/jira/browse/FLINK-38023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17987069#comment-17987069
 ] 

Gabor Somogyi commented on FLINK-38023:
---------------------------------------

Can you please give a minimal test in Flink or any code which does the repro?
Has it ever worked or it's something new? I've the feeling that the avro 
serializer is not so clever that it can handle such migrations.

> State Restoration fails when using GenericRecord with new Avro schema
> ---------------------------------------------------------------------
>
>                 Key: FLINK-38023
>                 URL: https://issues.apache.org/jira/browse/FLINK-38023
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Keshav Kansal
>            Priority: Major
>
> Hi Team,
> I have a Flink job which is storing a 
> [GenericRecord|https://github.com/apache/avro/blob/main/lang/java/avro/src/main/java/org/apache/avro/generic/GenericRecord.java]
>  in the ValueState and we use 
> [GenericRecordAvroTypeInfo|https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java]
> I am observing an issue while upgrading the avro schema.
> I reproduced this locally - 
> For example - Initially the Schema set is 
> {code:java}
> {
>   "type" : "record",
>   "name" : "SimpleEvent",
>   "fields" : [ {
>     "name" : "count",
>     "type" : "int"
>   } ]
> }
> {code}
> A savepoint is taken with some entities in the state. 
> The app is restored from the savepoint from the previous state and the below 
> new schema(Note - This is backward comptabile)
> {code:java}
> {
>   "type" : "record",
>   "name" : "SimpleEvent",
>   "fields" : [ {
>     "name" : "count",
>     "type" : "int"
>   }, {
>     "name" : "strTest",
>     "type" : [ "null", "string" ],
>     "default" : null
>   } ]
> }
> {code}
> While restoring the data from savepoint we observe the following exception 
> {code:java}
> Caused by: org.apache.flink.util.StateMigrationException: Error while trying 
> to migrate RocksDB state.
>       at 
> org.apache.flink.contrib.streaming.state.AbstractRocksDBState.migrateSerializedValue(AbstractRocksDBState.java:194)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:848)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:772)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:684)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:904)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:891)
>       at 
> org.apache.flink.runtime.state.KeyedStateFactory.createOrUpdateInternalState(KeyedStateFactory.java:47)
>       at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:70)
>       at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:361)
>       at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.getOrCreateKeyedState(StreamOperatorStateHandler.java:361)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:517)
>       at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:238)
>       at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
>       at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
>       at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>       at java.base/java.lang.Thread.run(Thread.java:840)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 1 out of bounds 
> for length 1
>       at org.apache.avro.generic.GenericData$Record.get(GenericData.java:289)
>       at org.apache.avro.generic.GenericData.getField(GenericData.java:860)
>       at org.apache.avro.generic.GenericData.getField(GenericData.java:879)
>       at 
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:243)
>       at 
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:234)
>       at 
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:145)
>       at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:95)
>       at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
>       at 
> org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:188)
>       at 
> org.apache.flink.contrib.streaming.state.AbstractRocksDBState.migrateSerializedValue(AbstractRocksDBState.java:192)
>       ... 22 more
> {code}
> Tested with Flink version 1.20.1



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to