[ 
https://issues.apache.org/jira/browse/FLINK-8715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16452457#comment-16452457
 ] 

ASF GitHub Bot commented on FLINK-8715:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5885#discussion_r184101616
  
    --- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
    @@ -1125,59 +1125,62 @@ private void 
restoreKeyGroupsShardWithTemporaryHelperInstance(
         * that we checkpointed, i.e. is already in the map of column families.
         */
        @SuppressWarnings("rawtypes, unchecked")
    -   protected <N, S> ColumnFamilyHandle getColumnFamily(
    +   protected <N, S> Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<N, S>> getColumnFamilyAndStateSerializer(
                StateDescriptor<?, S> descriptor, TypeSerializer<N> 
namespaceSerializer) throws IOException, StateMigrationException {
     
                Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo =
                        kvStateInformation.get(descriptor.getName());
     
    -           RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new 
RegisteredKeyedBackendStateMetaInfo<>(
    -                   descriptor.getType(),
    -                   descriptor.getName(),
    -                   namespaceSerializer,
    -                   descriptor.getSerializer());
    -
                if (stateInfo != null) {
                        // TODO with eager registration in place, these checks 
should be moved to restore()
     
                        RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> 
restoredMetaInfo =
                                
(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S>) 
restoredKvStateMetaInfos.get(descriptor.getName());
     
                        Preconditions.checkState(
    -                           Objects.equals(newMetaInfo.getName(), 
restoredMetaInfo.getName()),
    +                           Objects.equals(descriptor.getName(), 
restoredMetaInfo.getName()),
                                "Incompatible state names. " +
                                        "Was [" + restoredMetaInfo.getName() + 
"], " +
    -                                   "registered with [" + 
newMetaInfo.getName() + "].");
    +                                   "registered with [" + 
descriptor.getName() + "].");
     
    -                   if (!Objects.equals(newMetaInfo.getStateType(), 
StateDescriptor.Type.UNKNOWN)
    +                   if (!Objects.equals(descriptor.getType(), 
StateDescriptor.Type.UNKNOWN)
                                && 
!Objects.equals(restoredMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN)) 
{
     
                                Preconditions.checkState(
    -                                   newMetaInfo.getStateType() == 
restoredMetaInfo.getStateType(),
    +                                   descriptor.getType() == 
restoredMetaInfo.getStateType(),
                                        "Incompatible state types. " +
                                                "Was [" + 
restoredMetaInfo.getStateType() + "], " +
    -                                           "registered with [" + 
newMetaInfo.getStateType() + "].");
    +                                           "registered with [" + 
descriptor.getType() + "].");
                        }
     
                        // check compatibility results to determine if state 
migration is required
    +                   TypeSerializer<N> newNamespaceSerializer = 
namespaceSerializer.duplicate();
                        CompatibilityResult<N> namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
                                restoredMetaInfo.getNamespaceSerializer(),
                                null,
                                
restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
    -                           newMetaInfo.getNamespaceSerializer());
    +                           newNamespaceSerializer);
     
    +                   TypeSerializer<S> newStateSerializer = 
descriptor.getSerializer().duplicate();
    --- End diff --
    
    The `duplicate()` here looks redundant because it comes from the descriptor 
that already duplicates.


> RocksDB does not propagate reconfiguration of serializer to the states
> ----------------------------------------------------------------------
>
>                 Key: FLINK-8715
>                 URL: https://issues.apache.org/jira/browse/FLINK-8715
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.3.2
>            Reporter: Arvid Heise
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.5.0
>
>
> Any changes to the serializer done in #ensureCompability are lost during the 
> state creation.
> In particular, 
> [https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java#L68]
>  always uses a fresh copy of the StateDescriptor.
> An easy fix is to pass the reconfigured serializer as an additional parameter 
> in 
> [https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L1681]
>  , which can be retrieved through the side-output of getColumnFamily
> {code:java}
> kvStateInformation.get(stateDesc.getName()).f1.getStateSerializer()
> {code}
> I encountered it in 1.3.2 but the code in the master seems unchanged (hence 
> the pointer into master). I encountered it in ValueState, but I suspect the 
> same issue can be observed for all kinds of RocksDB states.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to