Keshav Kansal created FLINK-38023: ------------------------------------- Summary: State Restoration fails when using GenericRecord with new Avro schema Key: FLINK-38023 URL: https://issues.apache.org/jira/browse/FLINK-38023 Project: Flink Issue Type: Bug Reporter: Keshav Kansal
Hi Team, I have a Flink job which is storing a [GenericRecord|https://github.com/apache/avro/blob/main/lang/java/avro/src/main/java/org/apache/avro/generic/GenericRecord.java] in the ValueState and we use [GenericRecordAvroTypeInfo|https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java] I am observing an issue while upgrading the avro schema. I reproduced this locally - For example - Initially the Schema set is {code:java} { "type" : "record", "name" : "SimpleEvent", "fields" : [ { "name" : "count", "type" : "int" } ] } {code} A savepoint is taken with some entities in the state. The app is restored from the savepoint from the previous state and the below new schema {code:java} { "type" : "record", "name" : "SimpleEvent", "fields" : [ { "name" : "count", "type" : "int" }, { "name" : "count2", "type" : [ "null", "string" ], "default" : null } ] } {code} While restoring the data from savepoint we observe the following exception {code:java} Caused by: org.apache.flink.util.StateMigrationException: Error while trying to migrate RocksDB state. at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.migrateSerializedValue(AbstractRocksDBState.java:194) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:848) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:772) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:684) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:904) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:891) at org.apache.flink.runtime.state.KeyedStateFactory.createOrUpdateInternalState(KeyedStateFactory.java:47) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:70) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:361) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.getOrCreateKeyedState(StreamOperatorStateHandler.java:361) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:517) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:238) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.base/java.lang.Thread.run(Thread.java:840) Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 1 out of bounds for length 1 at org.apache.avro.generic.GenericData$Record.get(GenericData.java:289) at org.apache.avro.generic.GenericData.getField(GenericData.java:860) at org.apache.avro.generic.GenericData.getField(GenericData.java:879) at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:243) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:234) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:145) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:95) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:188) at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.migrateSerializedValue(AbstractRocksDBState.java:192) ... 22 more {code} Tested with Flink version 1.20.1 -- This message was sent by Atlassian Jira (v8.20.10#820010)