Hello! I have a Flink job that reads an Event Json stream and converts it to Avro format, and another job that reads this Event Avro stream does some processing and converts it to a Session Avro object. I have a change in schema in the Event and Session Avro object, and I want to restart both the jobs from Savepoint with this change. I am able to start the first job from savepoint that reads json data, but while starting the second job that reads the avro object from the first job (for which schema has changed), it is not able to resume from savepoint and runs into this RocksDB error -
*java.lang.RuntimeException: Error while adding data to RocksDB* *Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 115, Size: 3* Error: Root exception java.lang.RuntimeException: Error while adding data to RocksDB at org.apache.flink.contrib.streaming.state.RocksDBReducingState.add(RocksDBReducingState.java:116) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:409) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 115, Size: 3 Serialization trace: brand (com.rfk.dataplatform.avro.ProductData) event_data (com.rfk.dataplatform.avro.SessionEvent) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411) at org.apache.flink.contrib.streaming.state.RocksDBReducingState.add(RocksDBReducingState.java:109) ... 6 more Caused by: java.lang.IndexOutOfBoundsException: Index: 115, Size: 3 at java.util.ArrayList.rangeCheck(ArrayList.java:653) at java.util.ArrayList.get(ArrayList.java:429) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113) ... 17 more I saw a Kryo serialization error, so I tried to force Avro serialization by `env.getConfig().enableForceAvro()` but this too didn't work and gave the following error - Root exception java.lang.RuntimeException: Error while adding data to RocksDB at org.apache.flink.contrib.streaming.state.RocksDBReducingState.add(RocksDBReducingState.java:116) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:409) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ArrayIndexOutOfBoundsException avro_events_to_raw_user_session_transformer -> (Sink: raw_user_sessions_s3_sink, Flat Map -> FlinKafkaProducer 0.10.x, Flat Map -> FlinKafkaProducer 0.10.x, raw_session_to_user_session_aggregate_transformer -> FlinKafkaProducer 0.10.x, Flat Map -> FlinKafkaProducer 0.10.x, Session-id-information -> FlinKafkaProducer 0.10.x) (1/1) ip-10-20-9-32.ec2.internal:37587 java.lang.RuntimeException: Error while adding data to RocksDB at org.apache.flink.contrib.streaming.state.RocksDBReducingState.add(RocksDBReducingState.java:116) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:409) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ArrayIndexOutOfBoundsException It would be really helpful if someone could help me out with this and point out what I might be doing wrong or how I could go about doing this as I feel that this is a very common situation where you want to restart some jobs from savepoint with schema changes. Any help would be appreciated! Cheers, Dhvanan Shah