Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6325#discussion_r202289229
  
    --- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
    @@ -1312,6 +1283,128 @@ private void copyStateDataHandleData(
                return Tuple2.of(stateInfo.f0, newMetaInfo);
        }
     
    +   private <N, S extends State, SV> RegisteredKeyedBackendStateMetaInfo<N, 
SV> migrateStateIfNecessary(
    +                   StateDescriptor<S, SV> stateDesc,
    +                   TypeSerializer<N> namespaceSerializer,
    +                   Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo) throws Exception {
    +
    +           @SuppressWarnings("unchecked")
    +           RegisteredKeyedBackendStateMetaInfo.Snapshot<N, SV> 
restoredMetaInfoSnapshot =
    +                   (RegisteredKeyedBackendStateMetaInfo.Snapshot<N, SV>) 
restoredKvStateMetaInfos.get(
    +                           stateDesc.getName());
    +
    +           Preconditions.checkState(
    +                   restoredMetaInfoSnapshot != null,
    +                   "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
    +                           " but its corresponding restored snapshot 
cannot be found.");
    +
    +           StateUtil.checkStateTypeCompatibility(restoredMetaInfoSnapshot, 
stateDesc);
    +
    +           TypeSerializer<SV> stateSerializer = stateDesc.getSerializer();
    +
    +           RegisteredKeyedBackendStateMetaInfo<N, SV> newMetaInfo = new 
RegisteredKeyedBackendStateMetaInfo<>(
    +                   stateDesc.getType(),
    +                   stateDesc.getName(),
    +                   namespaceSerializer,
    +                   stateSerializer);
    +
    +           // check compatibility results to determine if state migration 
is required
    +           TypeSerializerSchemaCompatibility<N> namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
    +                   
restoredMetaInfoSnapshot.getNamespaceSerializerConfigSnapshot(),
    +                   namespaceSerializer);
    +
    +           TypeSerializerSchemaCompatibility<SV> stateCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
    +                   
restoredMetaInfoSnapshot.getStateSerializerConfigSnapshot(),
    +                   stateSerializer);
    +
    +           if (namespaceCompatibility.isIncompatible()) {
    +                   throw new UnsupportedOperationException(
    +                           "Changing the namespace TypeSerializer in an 
incompatible way is currently not supported.");
    +           }
    +
    +           if (stateCompatibility.isIncompatible()) {
    --- End diff --
    
    The handling of this branch could maybe go to it's own private method to 
break down this big monolithic method a bit.


---

Reply via email to