Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/6325#discussion_r202293477
--- Diff:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
---
@@ -1312,6 +1283,128 @@ private void copyStateDataHandleData(
return Tuple2.of(stateInfo.f0, newMetaInfo);
}
+ private <N, S extends State, SV> RegisteredKeyedBackendStateMetaInfo<N,
SV> migrateStateIfNecessary(
+ StateDescriptor<S, SV> stateDesc,
+ TypeSerializer<N> namespaceSerializer,
+ Tuple2<ColumnFamilyHandle,
RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo) throws Exception {
+
+ @SuppressWarnings("unchecked")
+ RegisteredKeyedBackendStateMetaInfo.Snapshot<N, SV>
restoredMetaInfoSnapshot =
+ (RegisteredKeyedBackendStateMetaInfo.Snapshot<N, SV>)
restoredKvStateMetaInfos.get(
+ stateDesc.getName());
+
+ Preconditions.checkState(
+ restoredMetaInfoSnapshot != null,
+ "Requested to check compatibility of a restored
RegisteredKeyedBackendStateMetaInfo," +
+ " but its corresponding restored snapshot
cannot be found.");
+
+ StateUtil.checkStateTypeCompatibility(restoredMetaInfoSnapshot,
stateDesc);
+
+ TypeSerializer<SV> stateSerializer = stateDesc.getSerializer();
+
+ RegisteredKeyedBackendStateMetaInfo<N, SV> newMetaInfo = new
RegisteredKeyedBackendStateMetaInfo<>(
+ stateDesc.getType(),
+ stateDesc.getName(),
+ namespaceSerializer,
+ stateSerializer);
+
+ // check compatibility results to determine if state migration
is required
+ TypeSerializerSchemaCompatibility<N> namespaceCompatibility =
CompatibilityUtil.resolveCompatibilityResult(
+
restoredMetaInfoSnapshot.getNamespaceSerializerConfigSnapshot(),
+ namespaceSerializer);
+
+ TypeSerializerSchemaCompatibility<SV> stateCompatibility =
CompatibilityUtil.resolveCompatibilityResult(
+
restoredMetaInfoSnapshot.getStateSerializerConfigSnapshot(),
+ stateSerializer);
+
+ if (namespaceCompatibility.isIncompatible()) {
+ throw new UnsupportedOperationException(
+ "Changing the namespace TypeSerializer in an
incompatible way is currently not supported.");
+ }
+
+ if (stateCompatibility.isIncompatible()) {
+ if
(stateDesc.getType().equals(StateDescriptor.Type.MAP)) {
+ throw new UnsupportedOperationException(
+ "Changing the TypeSerializers of a
MapState in an incompatible way is currently not supported.");
+ }
+
+ LOG.info(
+ "Performing state migration for state {}
because the state serializer changed in an incompatible way.",
+ stateDesc);
+
+ // we need to get an actual state instance because
migration is different
+ // for different state types. For example, ListState
needs to deal with
+ // individual elements
+ StateFactory stateFactory =
STATE_FACTORIES.get(stateDesc.getClass());
+ if (stateFactory == null) {
+ String message = String.format("State %s is not
supported by %s",
+ stateDesc.getClass(), this.getClass());
+ throw new FlinkRuntimeException(message);
+ }
+
+ State state = stateFactory.createState(
+ stateDesc,
+ Tuple2.of(stateInfo.f0, newMetaInfo),
+ RocksDBKeyedStateBackend.this);
+
+ if (!(state instanceof AbstractRocksDBState)) {
+ throw new FlinkRuntimeException(
+ "State should be an
AbstractRocksDBState but is " + state);
+ }
+
+ AbstractRocksDBState rocksDBState =
(AbstractRocksDBState<?, N, ?, S>) state;
+
+ Snapshot rocksDBSnapshot = null;
+ RocksIteratorWrapper iterator = null;
+
+ try (ReadOptions readOptions = new ReadOptions();) {
+ // TODO: can I do this with try-with-resource
or do I always have to call
+ // db.releaseSnapshot()
+ // I think I can't use try-with-resource
anyways since I have to set the snapshot
+ // on the ReadOptions
+
+ rocksDBSnapshot = db.getSnapshot();
+ readOptions.setSnapshot(rocksDBSnapshot);
+
+ iterator = getRocksIterator(db, stateInfo.f0,
readOptions);
+ iterator.seekToFirst();
+
+ while (iterator.isValid()) {
+
+ byte[] serializedValue =
iterator.value();
+
+ byte[] migratedSerializedValue =
rocksDBState.migrateSerializedValue(
+ serializedValue,
+
restoredMetaInfoSnapshot.getStateSerializerConfigSnapshot().restoreSerializer(),
+ stateDesc.getSerializer());
+
+ db.put(stateInfo.f0, iterator.key(),
migratedSerializedValue);
+
+ iterator.next();
+ }
+ } finally {
+ if (iterator != null) {
+ iterator.close();
+ }
+ if (rocksDBSnapshot != null) {
+ db.releaseSnapshot(rocksDBSnapshot);
+ // TODO: do I need to call close() or
is calling db.releaseSnapshot() enough
+ rocksDBSnapshot.close();
--- End diff --
It is backed by a native objects, so always call `close` if the method is
there
---