Hello!

I have a Flink job that reads an Event Json stream and converts it to Avro
format, and another job that reads this Event Avro stream does some
processing and converts it to a Session Avro object.
I have a change in schema in the Event and Session Avro object, and I want
to restart both the jobs from Savepoint with this change.
I am able to start the first job from savepoint that reads json data, but
while starting the second job that reads the avro object from the
first job (for
which schema has changed), it is not able to resume from savepoint and runs
into this RocksDB error -

*java.lang.RuntimeException: Error while adding data to RocksDB*
*Caused by: com.esotericsoftware.kryo.KryoException:
java.lang.IndexOutOfBoundsException: Index: 115, Size: 3*

Error:
Root exception
java.lang.RuntimeException: Error while adding data to RocksDB
at
org.apache.flink.contrib.streaming.state.RocksDBReducingState.add(RocksDBReducingState.java:116)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:409)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.esotericsoftware.kryo.KryoException:
java.lang.IndexOutOfBoundsException: Index: 115, Size: 3
Serialization trace:
brand (com.rfk.dataplatform.avro.ProductData)
event_data (com.rfk.dataplatform.avro.SessionEvent)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)
at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
at
org.apache.flink.contrib.streaming.state.RocksDBReducingState.add(RocksDBReducingState.java:109)
... 6 more
Caused by: java.lang.IndexOutOfBoundsException: Index: 115, Size: 3
at java.util.ArrayList.rangeCheck(ArrayList.java:653)
at java.util.ArrayList.get(ArrayList.java:429)
at
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
... 17 more



I saw a Kryo serialization error, so I tried to force Avro serialization by
`env.getConfig().enableForceAvro()` but this too didn't work and gave the
following error -

Root exception
java.lang.RuntimeException: Error while adding data to RocksDB
at
org.apache.flink.contrib.streaming.state.RocksDBReducingState.add(RocksDBReducingState.java:116)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:409)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException
avro_events_to_raw_user_session_transformer -> (Sink:
raw_user_sessions_s3_sink, Flat Map -> FlinKafkaProducer 0.10.x, Flat Map
-> FlinKafkaProducer 0.10.x,
raw_session_to_user_session_aggregate_transformer -> FlinKafkaProducer
0.10.x, Flat Map -> FlinKafkaProducer 0.10.x, Session-id-information ->
FlinKafkaProducer 0.10.x) (1/1)
ip-10-20-9-32.ec2.internal:37587
java.lang.RuntimeException: Error while adding data to RocksDB
at
org.apache.flink.contrib.streaming.state.RocksDBReducingState.add(RocksDBReducingState.java:116)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:409)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException


It would be really helpful if someone could help me out with this and point
out what I might be doing wrong or how I could go about doing this as I
feel that this is a very common situation where you want to restart some
jobs from savepoint with schema changes. Any help would be appreciated!

Cheers,
Dhvanan Shah

Reply via email to