[ 
https://issues.apache.org/jira/browse/FLINK-27218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yue Ma updated FLINK-27218:
---------------------------
    Description: 
OperatorState such as *BroadcastState* or *PartitionableListState*  can only be 
constructed via {*}DefaultOperatorStateBackend{*}. But when *BroadcastState* or 
*PartitionableListState* Serializer changes after we restart the job , it seems 
to have the following problems .

As an example, we can see how PartitionableListState is initialized.

First, RestoreOperation will construct a restored PartitionableListState based 
on the information in the snapshot.

Then StateMetaInfo in partitionableListState will be updated  as the following 
code
{code:java}
TypeSerializerSchemaCompatibility<S> stateCompatibility =
                
restoredPartitionableListStateMetaInfo.updatePartitionStateSerializer(newPartitionStateSerializer);

partitionableListState.setStateMetaInfo(restoredPartitionableListStateMetaInfo);{code}
The main problem is that there is also an *internalListCopySerializer* in 
*PartitionableListState* that is built using the previous Serializer and it has 
not been updated. 

Therefore, when we update the StateMetaInfo, the internalListCopySerializer 
also needs to be updated.

 

  was:
OperatorState such as *BroadcastState* or *PartitionableListState*  can only be 
constructed via {*}DefaultOperatorStateBackend{*}. But when *BroadcastState* or 
*PartitionableListState* Serializer changes after we restart the job , it seems 
to have the following problems .

As an example, we can see how PartitionableListState is initialized.

First, RestoreOperation will construct a restored PartitionableListState based 
on the information in the snapshot.

Then StateMetaInfo in partitionableListState will be updated  as the following 
code
{code:java}
TypeSerializerSchemaCompatibility<S> stateCompatibility =
                
restoredPartitionableListStateMetaInfo.updatePartitionStateSerializer(newPartitionStateSerializer);

partitionableListState.setStateMetaInfo(restoredPartitionableListStateMetaInfo);{code}
The main problem is that there is also an *internalListCopySerializer* in 
*PartitionableListState* that is built using the previous Serializer and it has 
not been updated. 

But internalListCopySerializer will be used later.

Therefore, when we update the StateMetaInfo, the internalListCopySerializer 
also needs to be updated.

This problem also exists in BroadcastState


> Serializer in OperatorState has not been updated when new Serializers are NOT 
> incompatible
> ------------------------------------------------------------------------------------------
>
>                 Key: FLINK-27218
>                 URL: https://issues.apache.org/jira/browse/FLINK-27218
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.15.1
>            Reporter: Yue Ma
>            Priority: Major
>         Attachments: image-2022-04-13-14-50-10-921.png
>
>
> OperatorState such as *BroadcastState* or *PartitionableListState*  can only 
> be constructed via {*}DefaultOperatorStateBackend{*}. But when 
> *BroadcastState* or *PartitionableListState* Serializer changes after we 
> restart the job , it seems to have the following problems .
> As an example, we can see how PartitionableListState is initialized.
> First, RestoreOperation will construct a restored PartitionableListState 
> based on the information in the snapshot.
> Then StateMetaInfo in partitionableListState will be updated  as the 
> following code
> {code:java}
> TypeSerializerSchemaCompatibility<S> stateCompatibility =
>                 
> restoredPartitionableListStateMetaInfo.updatePartitionStateSerializer(newPartitionStateSerializer);
> partitionableListState.setStateMetaInfo(restoredPartitionableListStateMetaInfo);{code}
> The main problem is that there is also an *internalListCopySerializer* in 
> *PartitionableListState* that is built using the previous Serializer and it 
> has not been updated. 
> Therefore, when we update the StateMetaInfo, the internalListCopySerializer 
> also needs to be updated.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to