[
https://issues.apache.org/jira/browse/FLINK-9376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542770#comment-16542770
]
ASF GitHub Bot commented on FLINK-9376:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/6325#discussion_r202289229
--- Diff:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
---
@@ -1312,6 +1283,128 @@ private void copyStateDataHandleData(
return Tuple2.of(stateInfo.f0, newMetaInfo);
}
+ private <N, S extends State, SV> RegisteredKeyedBackendStateMetaInfo<N,
SV> migrateStateIfNecessary(
+ StateDescriptor<S, SV> stateDesc,
+ TypeSerializer<N> namespaceSerializer,
+ Tuple2<ColumnFamilyHandle,
RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo) throws Exception {
+
+ @SuppressWarnings("unchecked")
+ RegisteredKeyedBackendStateMetaInfo.Snapshot<N, SV>
restoredMetaInfoSnapshot =
+ (RegisteredKeyedBackendStateMetaInfo.Snapshot<N, SV>)
restoredKvStateMetaInfos.get(
+ stateDesc.getName());
+
+ Preconditions.checkState(
+ restoredMetaInfoSnapshot != null,
+ "Requested to check compatibility of a restored
RegisteredKeyedBackendStateMetaInfo," +
+ " but its corresponding restored snapshot
cannot be found.");
+
+ StateUtil.checkStateTypeCompatibility(restoredMetaInfoSnapshot,
stateDesc);
+
+ TypeSerializer<SV> stateSerializer = stateDesc.getSerializer();
+
+ RegisteredKeyedBackendStateMetaInfo<N, SV> newMetaInfo = new
RegisteredKeyedBackendStateMetaInfo<>(
+ stateDesc.getType(),
+ stateDesc.getName(),
+ namespaceSerializer,
+ stateSerializer);
+
+ // check compatibility results to determine if state migration
is required
+ TypeSerializerSchemaCompatibility<N> namespaceCompatibility =
CompatibilityUtil.resolveCompatibilityResult(
+
restoredMetaInfoSnapshot.getNamespaceSerializerConfigSnapshot(),
+ namespaceSerializer);
+
+ TypeSerializerSchemaCompatibility<SV> stateCompatibility =
CompatibilityUtil.resolveCompatibilityResult(
+
restoredMetaInfoSnapshot.getStateSerializerConfigSnapshot(),
+ stateSerializer);
+
+ if (namespaceCompatibility.isIncompatible()) {
+ throw new UnsupportedOperationException(
+ "Changing the namespace TypeSerializer in an
incompatible way is currently not supported.");
+ }
+
+ if (stateCompatibility.isIncompatible()) {
--- End diff --
The handling of this branch could maybe go to it's own private method to
break down this big monolithic method a bit.
> Allow upgrading to incompatible state serializers (state schema evolution)
> --------------------------------------------------------------------------
>
> Key: FLINK-9376
> URL: https://issues.apache.org/jira/browse/FLINK-9376
> Project: Flink
> Issue Type: New Feature
> Components: State Backends, Checkpointing, Type Serialization System
> Reporter: Tzu-Li (Gordon) Tai
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Currently, users have access to upgrade state serializers on the restore run
> of a stateful job, as long as the upgraded new serializer remains backwards
> compatible with all previous written data in the savepoint (i.e. it can read
> all previous and current schema of serialized state objects).
> What is still lacking is the ability to upgrade to incompatible serializers.
> Upon being registered an incompatible serializer for existing restored state,
> that state needs to go through the process of -
> 1. read serialized state with the previous serializer
> 2. passing each deserialized state object through a “migration map
> function”, and
> 3. writing back the state with the new serializer
> The availability of this process should be strictly limited to state
> registrations that occur before the actual processing begins (e.g. in the
> {{open}} or {{initializeState}} methods), so that we avoid performing these
> operations during processing.
> How this procedure actually occurs, differs across different types of state
> backends.
> For example, for state backends that eagerly deserialize / lazily serialize
> state (e.g. {{HeapStateBackend}}), the job execution itself can be seen as a
> "migration"; everything is deserialized to state objects on restore, and is
> only serialized again, with the new serializer, on checkpoints.
> Therefore, for these state backends, the above process is irrelevant.
> On the other hand, for state backends that lazily deserialize / eagerly
> serialize state (e.g. {{RocksDBStateBackend}}), the state evolution process
> needs to happen for every state with a newly registered incompatible
> serializer.
> Procedure 2. will allow even state type migrations, but that is out-of-scope
> of this JIRA.
> This ticket focuses only on procedures 1. and 3., where we try to enable
> schema evolution without state type changes.
> This is an umbrella JIRA ticket that overlooks this feature, including a few
> preliminary tasks that work towards enabling it.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)