[ 
https://issues.apache.org/jira/browse/FLINK-28653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Peleg Tsadok updated FLINK-28653:
---------------------------------
    Description: 
I am trying to do a POC of Flink State Schema Evolution. I am using Flink 
1.15.0 and Java 11 but also tested on Flink 1.14.3.
I tried to create 3 data classes - one for each serialization type:
1. `io.peleg.kryo.User` - Uses `java.time.Instant` class which I know is not 
supported for POJO serialization in Flink.
2. `io.peleg.pojo.User` - Uses only classic wrapped primitives - `Integer`, 
`Long`, `String`. The getters, setters and constructors are generated using 
Lombok.
3. `io.peleg.avro.User` - Generated from Avro schema using Avro Maven Plugin.

For each class I wrote a stream job that uses a time window to buffer elements 
and turn them into a list.

For each class I tried to do the following:
1. Run a job
2. Stop with savepoint
3. Add a field to the data class
4. Submit using savepoint

For all data classes the submit with savepoint failed with this exception:
{code:java}
java.lang.Exception: Exception while creating StreamOperatorStateContext.
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
    at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for WindowOperator_3983d6bb2f0a45b638461bc99138f22f_(2/2) from any of 
the 1 provided restore options.
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
    ... 11 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore heap backend
    at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:172)
    at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:106)
    at 
org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:143)
    at 
org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:74)
    at 
org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:140)
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
    ... 13 more
Caused by: com.esotericsoftware.kryo.KryoException: 
java.lang.IndexOutOfBoundsException: Index 83 out of bounds for length 3
Serialization trace:
favoriteColor (io.peleg.avro.User)
    at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
    at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:402)
    at 
org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.readKVStateData(HeapSavepointRestoreOperation.java:219)
    at 
org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.readKeyGroupStateData(HeapSavepointRestoreOperation.java:149)
    at 
org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.restore(HeapSavepointRestoreOperation.java:125)
    at 
org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.restore(HeapSavepointRestoreOperation.java:57)
    at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:169)
    ... 20 more
Caused by: java.lang.IndexOutOfBoundsException: Index 83 out of bounds for 
length 3
    at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source)
    at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown 
Source)
    at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source)
    at java.base/java.util.Objects.checkIndex(Unknown Source)
    at java.base/java.util.ArrayList.get(Unknown Source)
    at 
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
    at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
    at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
    ... 31 more{code}
 

I expected this exception would be thrown for `io.peleg.kryo.User` since Flink 
does not support state schema evolution for Kryo serialized classes.

But it seems to me like for all classes it ended up using the Kryo serializer 
instead of the POJO or Avro serializers.

My entire code is public on GitHub 
[here|[https://github.com/peleg68/flink-state-schema-evolution]] .

What I would like to achieve is succusfuly running a job from a savepoint of an 
older version of a POJO with less/more fields.
 

  was:
I am trying to do a POC of Flink State Schema Evolution. I am using Flink 
1.15.0 and Java 11 but also tested on Flink 1.14.3.
I tried to create 3 data classes - one for each serialization type:
1. `io.peleg.kryo.User` - Uses `java.time.Instant` class which I know is not 
supported for POJO serialization in Flink.
2. `io.peleg.pojo.User` - Uses only classic wrapped primitives - `Integer`, 
`Long`, `String`. The getters, setters and constructors are generated using 
Lombok.
3. `io.peleg.avro.User` - Generated from Avro schema using Avro Maven Plugin.

For each class I wrote a stream job that uses a time window to buffer elements 
and turn them into a list.

For each class I tried to do the following:
1. Run a job
2. Stop with savepoint
3. Add a field to the data class
4. Submit using savepoint

For all data classes the submit with savepoint failed with this exception:
{code:java}
java.lang.Exception: Exception while creating StreamOperatorStateContext.
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
    at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for WindowOperator_3983d6bb2f0a45b638461bc99138f22f_(2/2) from any of 
the 1 provided restore options.
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
    ... 11 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore heap backend
    at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:172)
    at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:106)
    at 
org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:143)
    at 
org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:74)
    at 
org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:140)
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
    ... 13 more
Caused by: com.esotericsoftware.kryo.KryoException: 
java.lang.IndexOutOfBoundsException: Index 83 out of bounds for length 3
Serialization trace:
favoriteColor (io.peleg.avro.User)
    at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
    at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:402)
    at 
org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.readKVStateData(HeapSavepointRestoreOperation.java:219)
    at 
org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.readKeyGroupStateData(HeapSavepointRestoreOperation.java:149)
    at 
org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.restore(HeapSavepointRestoreOperation.java:125)
    at 
org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.restore(HeapSavepointRestoreOperation.java:57)
    at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:169)
    ... 20 more
Caused by: java.lang.IndexOutOfBoundsException: Index 83 out of bounds for 
length 3
    at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source)
    at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown 
Source)
    at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source)
    at java.base/java.util.Objects.checkIndex(Unknown Source)
    at java.base/java.util.ArrayList.get(Unknown Source)
    at 
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
    at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
    at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
    ... 31 more{code}
 

I expected this exception would be thrown for `io.peleg.kryo.User` since Flink 
does not support state schema evolution for Kryo serialized classes.

But it seems to me like for all classes it ended up using the Kryo serializer 
instead of the POJO or Avro serializers.

My entire code is public on GitHub 
[here|[https://github.com/peleg68/flink-state-schema-evolution]].

What I would like to achieve is succusfuly running a job from a savepoint of an 
older version of a POJO with less/more fields.
 


> State Schema Evolution does not work - Flink defaults to Kryo serialization 
> even for POJOs and Avro SpecificRecords
> -------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-28653
>                 URL: https://issues.apache.org/jira/browse/FLINK-28653
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Type Serialization System, Runtime / State Backends
>    Affects Versions: 1.14.3, 1.15.0
>         Environment: I ran the job on a Flink cluster I spun up using docker 
> compose:
> ```
> version: "2.2"
> services:
>   jobmanager:
>     image: flink:latest
>     ports:
>       - "8081:8081"
>     command: jobmanager
>     environment:
>       - |
>         FLINK_PROPERTIES=
>         jobmanager.rpc.address: jobmanager
>   taskmanager:
>     image: flink:latest
>     depends_on:
>       - jobmanager
>     command: taskmanager
>     scale: 1
>     environment:
>       - |
>         FLINK_PROPERTIES=
>         jobmanager.rpc.address: jobmanager
>         taskmanager.numberOfTaskSlots: 2
> ```
>  My machine is a MacBook Pro (14-inch, 2021) with the Apple M1 Pro chip.
> I'm running macOS Monterey Version 12.4.
>            Reporter: Peleg Tsadok
>            Priority: Major
>              Labels: KryoSerializer, State, avro, pojo, schema-evolution
>
> I am trying to do a POC of Flink State Schema Evolution. I am using Flink 
> 1.15.0 and Java 11 but also tested on Flink 1.14.3.
> I tried to create 3 data classes - one for each serialization type:
> 1. `io.peleg.kryo.User` - Uses `java.time.Instant` class which I know is not 
> supported for POJO serialization in Flink.
> 2. `io.peleg.pojo.User` - Uses only classic wrapped primitives - `Integer`, 
> `Long`, `String`. The getters, setters and constructors are generated using 
> Lombok.
> 3. `io.peleg.avro.User` - Generated from Avro schema using Avro Maven Plugin.
> For each class I wrote a stream job that uses a time window to buffer 
> elements and turn them into a list.
> For each class I tried to do the following:
> 1. Run a job
> 2. Stop with savepoint
> 3. Add a field to the data class
> 4. Submit using savepoint
> For all data classes the submit with savepoint failed with this exception:
> {code:java}
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255)
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
>     at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>     at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for WindowOperator_3983d6bb2f0a45b638461bc99138f22f_(2/2) from 
> any of the 1 provided restore options.
>     at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
>     ... 11 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed 
> when trying to restore heap backend
>     at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:172)
>     at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:106)
>     at 
> org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:143)
>     at 
> org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:74)
>     at 
> org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:140)
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
>     at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>     at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>     ... 13 more
> Caused by: com.esotericsoftware.kryo.KryoException: 
> java.lang.IndexOutOfBoundsException: Index 83 out of bounds for length 3
> Serialization trace:
> favoriteColor (io.peleg.avro.User)
>     at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>     at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>     at 
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
>     at 
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>     at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:402)
>     at 
> org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.readKVStateData(HeapSavepointRestoreOperation.java:219)
>     at 
> org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.readKeyGroupStateData(HeapSavepointRestoreOperation.java:149)
>     at 
> org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.restore(HeapSavepointRestoreOperation.java:125)
>     at 
> org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.restore(HeapSavepointRestoreOperation.java:57)
>     at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:169)
>     ... 20 more
> Caused by: java.lang.IndexOutOfBoundsException: Index 83 out of bounds for 
> length 3
>     at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source)
>     at 
> java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown 
> Source)
>     at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source)
>     at java.base/java.util.Objects.checkIndex(Unknown Source)
>     at java.base/java.util.ArrayList.get(Unknown Source)
>     at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>     at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>     at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
>     at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
>     ... 31 more{code}
>  
> I expected this exception would be thrown for `io.peleg.kryo.User` since 
> Flink does not support state schema evolution for Kryo serialized classes.
> But it seems to me like for all classes it ended up using the Kryo serializer 
> instead of the POJO or Avro serializers.
> My entire code is public on GitHub 
> [here|[https://github.com/peleg68/flink-state-schema-evolution]] .
> What I would like to achieve is succusfuly running a job from a savepoint of 
> an older version of a POJO with less/more fields.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to