Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6325#discussion_r202293477
  
    --- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
    @@ -1312,6 +1283,128 @@ private void copyStateDataHandleData(
                return Tuple2.of(stateInfo.f0, newMetaInfo);
        }
     
    +   private <N, S extends State, SV> RegisteredKeyedBackendStateMetaInfo<N, 
SV> migrateStateIfNecessary(
    +                   StateDescriptor<S, SV> stateDesc,
    +                   TypeSerializer<N> namespaceSerializer,
    +                   Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo) throws Exception {
    +
    +           @SuppressWarnings("unchecked")
    +           RegisteredKeyedBackendStateMetaInfo.Snapshot<N, SV> 
restoredMetaInfoSnapshot =
    +                   (RegisteredKeyedBackendStateMetaInfo.Snapshot<N, SV>) 
restoredKvStateMetaInfos.get(
    +                           stateDesc.getName());
    +
    +           Preconditions.checkState(
    +                   restoredMetaInfoSnapshot != null,
    +                   "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
    +                           " but its corresponding restored snapshot 
cannot be found.");
    +
    +           StateUtil.checkStateTypeCompatibility(restoredMetaInfoSnapshot, 
stateDesc);
    +
    +           TypeSerializer<SV> stateSerializer = stateDesc.getSerializer();
    +
    +           RegisteredKeyedBackendStateMetaInfo<N, SV> newMetaInfo = new 
RegisteredKeyedBackendStateMetaInfo<>(
    +                   stateDesc.getType(),
    +                   stateDesc.getName(),
    +                   namespaceSerializer,
    +                   stateSerializer);
    +
    +           // check compatibility results to determine if state migration 
is required
    +           TypeSerializerSchemaCompatibility<N> namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
    +                   
restoredMetaInfoSnapshot.getNamespaceSerializerConfigSnapshot(),
    +                   namespaceSerializer);
    +
    +           TypeSerializerSchemaCompatibility<SV> stateCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
    +                   
restoredMetaInfoSnapshot.getStateSerializerConfigSnapshot(),
    +                   stateSerializer);
    +
    +           if (namespaceCompatibility.isIncompatible()) {
    +                   throw new UnsupportedOperationException(
    +                           "Changing the namespace TypeSerializer in an 
incompatible way is currently not supported.");
    +           }
    +
    +           if (stateCompatibility.isIncompatible()) {
    +                   if 
(stateDesc.getType().equals(StateDescriptor.Type.MAP)) {
    +                           throw new UnsupportedOperationException(
    +                                   "Changing the TypeSerializers of a 
MapState in an incompatible way is currently not supported.");
    +                   }
    +
    +                   LOG.info(
    +                           "Performing state migration for state {} 
because the state serializer changed in an incompatible way.",
    +                           stateDesc);
    +
    +                   // we need to get an actual state instance because 
migration is different
    +                   // for different state types. For example, ListState 
needs to deal with
    +                   // individual elements
    +                   StateFactory stateFactory = 
STATE_FACTORIES.get(stateDesc.getClass());
    +                   if (stateFactory == null) {
    +                           String message = String.format("State %s is not 
supported by %s",
    +                                   stateDesc.getClass(), this.getClass());
    +                           throw new FlinkRuntimeException(message);
    +                   }
    +
    +                   State state = stateFactory.createState(
    +                           stateDesc,
    +                           Tuple2.of(stateInfo.f0, newMetaInfo),
    +                           RocksDBKeyedStateBackend.this);
    +
    +                   if (!(state instanceof AbstractRocksDBState)) {
    +                           throw new FlinkRuntimeException(
    +                                   "State should be an 
AbstractRocksDBState but is " + state);
    +                   }
    +
    +                   AbstractRocksDBState rocksDBState = 
(AbstractRocksDBState<?, N, ?, S>) state;
    +
    +                   Snapshot rocksDBSnapshot = null;
    +                   RocksIteratorWrapper iterator = null;
    +
    +                   try (ReadOptions readOptions = new ReadOptions();) {
    +                           // TODO: can I do this with try-with-resource 
or do I always have to call
    +                           // db.releaseSnapshot()
    +                           // I think I can't use try-with-resource 
anyways since I have to set the snapshot
    +                           // on the ReadOptions
    +
    +                           rocksDBSnapshot = db.getSnapshot();
    +                           readOptions.setSnapshot(rocksDBSnapshot);
    +
    +                           iterator = getRocksIterator(db, stateInfo.f0, 
readOptions);
    +                           iterator.seekToFirst();
    +
    +                           while (iterator.isValid()) {
    +
    +                                   byte[] serializedValue = 
iterator.value();
    +
    +                                   byte[] migratedSerializedValue = 
rocksDBState.migrateSerializedValue(
    +                                           serializedValue,
    +                                           
restoredMetaInfoSnapshot.getStateSerializerConfigSnapshot().restoreSerializer(),
    +                                           stateDesc.getSerializer());
    +
    +                                   db.put(stateInfo.f0, iterator.key(), 
migratedSerializedValue);
    +
    +                                   iterator.next();
    +                           }
    +                   } finally {
    +                           if (iterator != null) {
    +                                   iterator.close();
    +                           }
    +                           if (rocksDBSnapshot != null) {
    +                                   db.releaseSnapshot(rocksDBSnapshot);
    +                                   // TODO: do I need to call close() or 
is calling db.releaseSnapshot() enough
    +                                   rocksDBSnapshot.close();
    --- End diff --
    
    It is backed by a native objects, so always call `close` if the method is 
there


---

Reply via email to