Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/6325#discussion_r202293217
--- Diff:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
---
@@ -1312,6 +1283,128 @@ private void copyStateDataHandleData(
return Tuple2.of(stateInfo.f0, newMetaInfo);
}
+ private <N, S extends State, SV> RegisteredKeyedBackendStateMetaInfo<N,
SV> migrateStateIfNecessary(
+ StateDescriptor<S, SV> stateDesc,
+ TypeSerializer<N> namespaceSerializer,
+ Tuple2<ColumnFamilyHandle,
RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo) throws Exception {
+
+ @SuppressWarnings("unchecked")
+ RegisteredKeyedBackendStateMetaInfo.Snapshot<N, SV>
restoredMetaInfoSnapshot =
+ (RegisteredKeyedBackendStateMetaInfo.Snapshot<N, SV>)
restoredKvStateMetaInfos.get(
+ stateDesc.getName());
+
+ Preconditions.checkState(
+ restoredMetaInfoSnapshot != null,
+ "Requested to check compatibility of a restored
RegisteredKeyedBackendStateMetaInfo," +
+ " but its corresponding restored snapshot
cannot be found.");
+
+ StateUtil.checkStateTypeCompatibility(restoredMetaInfoSnapshot,
stateDesc);
+
+ TypeSerializer<SV> stateSerializer = stateDesc.getSerializer();
+
+ RegisteredKeyedBackendStateMetaInfo<N, SV> newMetaInfo = new
RegisteredKeyedBackendStateMetaInfo<>(
+ stateDesc.getType(),
+ stateDesc.getName(),
+ namespaceSerializer,
+ stateSerializer);
+
+ // check compatibility results to determine if state migration
is required
+ TypeSerializerSchemaCompatibility<N> namespaceCompatibility =
CompatibilityUtil.resolveCompatibilityResult(
+
restoredMetaInfoSnapshot.getNamespaceSerializerConfigSnapshot(),
+ namespaceSerializer);
+
+ TypeSerializerSchemaCompatibility<SV> stateCompatibility =
CompatibilityUtil.resolveCompatibilityResult(
+
restoredMetaInfoSnapshot.getStateSerializerConfigSnapshot(),
+ stateSerializer);
+
+ if (namespaceCompatibility.isIncompatible()) {
+ throw new UnsupportedOperationException(
+ "Changing the namespace TypeSerializer in an
incompatible way is currently not supported.");
+ }
+
+ if (stateCompatibility.isIncompatible()) {
+ if
(stateDesc.getType().equals(StateDescriptor.Type.MAP)) {
+ throw new UnsupportedOperationException(
+ "Changing the TypeSerializers of a
MapState in an incompatible way is currently not supported.");
+ }
+
+ LOG.info(
+ "Performing state migration for state {}
because the state serializer changed in an incompatible way.",
+ stateDesc);
+
+ // we need to get an actual state instance because
migration is different
+ // for different state types. For example, ListState
needs to deal with
+ // individual elements
+ StateFactory stateFactory =
STATE_FACTORIES.get(stateDesc.getClass());
+ if (stateFactory == null) {
+ String message = String.format("State %s is not
supported by %s",
+ stateDesc.getClass(), this.getClass());
+ throw new FlinkRuntimeException(message);
+ }
+
+ State state = stateFactory.createState(
+ stateDesc,
+ Tuple2.of(stateInfo.f0, newMetaInfo),
+ RocksDBKeyedStateBackend.this);
+
+ if (!(state instanceof AbstractRocksDBState)) {
+ throw new FlinkRuntimeException(
+ "State should be an
AbstractRocksDBState but is " + state);
+ }
+
+ AbstractRocksDBState rocksDBState =
(AbstractRocksDBState<?, N, ?, S>) state;
+
+ Snapshot rocksDBSnapshot = null;
+ RocksIteratorWrapper iterator = null;
+
+ try (ReadOptions readOptions = new ReadOptions();) {
--- End diff --
I would suggest to try this:
```
Snapshot rocksDBSnapshot = db.getSnapshot();
try (
ReadOptions readOptions = new
ReadOptions().setSnapshot(rocksDBSnapshot);
RocksIteratorWrapper iterator =
getRocksIterator(db, stateInfo.f0, readOptions)) {
iterator.seekToFirst();
(...)
} finally {
db.releaseSnapshot(rocksDBSnapshot);
rocksDBSnapshot.close();
}
```
---