Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5885#discussion_r184101616
  
    --- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
    @@ -1125,59 +1125,62 @@ private void 
restoreKeyGroupsShardWithTemporaryHelperInstance(
         * that we checkpointed, i.e. is already in the map of column families.
         */
        @SuppressWarnings("rawtypes, unchecked")
    -   protected <N, S> ColumnFamilyHandle getColumnFamily(
    +   protected <N, S> Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<N, S>> getColumnFamilyAndStateSerializer(
                StateDescriptor<?, S> descriptor, TypeSerializer<N> 
namespaceSerializer) throws IOException, StateMigrationException {
     
                Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo =
                        kvStateInformation.get(descriptor.getName());
     
    -           RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new 
RegisteredKeyedBackendStateMetaInfo<>(
    -                   descriptor.getType(),
    -                   descriptor.getName(),
    -                   namespaceSerializer,
    -                   descriptor.getSerializer());
    -
                if (stateInfo != null) {
                        // TODO with eager registration in place, these checks 
should be moved to restore()
     
                        RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> 
restoredMetaInfo =
                                
(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S>) 
restoredKvStateMetaInfos.get(descriptor.getName());
     
                        Preconditions.checkState(
    -                           Objects.equals(newMetaInfo.getName(), 
restoredMetaInfo.getName()),
    +                           Objects.equals(descriptor.getName(), 
restoredMetaInfo.getName()),
                                "Incompatible state names. " +
                                        "Was [" + restoredMetaInfo.getName() + 
"], " +
    -                                   "registered with [" + 
newMetaInfo.getName() + "].");
    +                                   "registered with [" + 
descriptor.getName() + "].");
     
    -                   if (!Objects.equals(newMetaInfo.getStateType(), 
StateDescriptor.Type.UNKNOWN)
    +                   if (!Objects.equals(descriptor.getType(), 
StateDescriptor.Type.UNKNOWN)
                                && 
!Objects.equals(restoredMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN)) 
{
     
                                Preconditions.checkState(
    -                                   newMetaInfo.getStateType() == 
restoredMetaInfo.getStateType(),
    +                                   descriptor.getType() == 
restoredMetaInfo.getStateType(),
                                        "Incompatible state types. " +
                                                "Was [" + 
restoredMetaInfo.getStateType() + "], " +
    -                                           "registered with [" + 
newMetaInfo.getStateType() + "].");
    +                                           "registered with [" + 
descriptor.getType() + "].");
                        }
     
                        // check compatibility results to determine if state 
migration is required
    +                   TypeSerializer<N> newNamespaceSerializer = 
namespaceSerializer.duplicate();
                        CompatibilityResult<N> namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
                                restoredMetaInfo.getNamespaceSerializer(),
                                null,
                                
restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
    -                           newMetaInfo.getNamespaceSerializer());
    +                           newNamespaceSerializer);
     
    +                   TypeSerializer<S> newStateSerializer = 
descriptor.getSerializer().duplicate();
    --- End diff --
    
    The `duplicate()` here looks redundant because it comes from the descriptor 
that already duplicates.


---

Reply via email to