[FLINK-6804] [state] Consistent state migration behaviour across state backends

Prior to this commit, memory and non-memory state backends behaved
differently w.r.t. state migration. For the memory backends, we did
not require the new serializer to be compatible in order for the job to
proceed after restore, because all state have already been deserialized
to objects and the new serializer can always just be used as is.
Therefore, the compatibility checks were not performed for the memory
backends, resulting in different code paths between the different state
backends.

However, this inconsistent behaviour across backends will be confusing
for users. This commit adds the code path to check the newly registered
serializer's compatibility in the memory backends (even though it isn't
required), and deliberately fails the job if the new serializer is
incompatible.

Note that the compatibiilty code paths will be truly unified and
required for all backends once we have eager state registration.

This closes #4073.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f0f2e99b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f0f2e99b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f0f2e99b

Branch: refs/heads/master
Commit: f0f2e99b6c829c4f4e2ca47c7647a64fe0c9d808
Parents: ae285f9
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Sun Jun 4 22:40:26 2017 +0200
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Tue Jun 13 06:38:35 2017 +0200

----------------------------------------------------------------------
 .../api/common/typeutils/CompatibilityUtil.java |   8 +-
 .../state/DefaultOperatorStateBackend.java      | 119 +++++++++++++++----
 .../state/heap/HeapKeyedStateBackend.java       |  54 ++++++++-
 3 files changed, 151 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f0f2e99b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
index 94bb9bd..df7f216 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
@@ -66,13 +66,11 @@ public class CompatibilityUtil {
                        } else {
                                if (precedingSerializer != null && 
!(precedingSerializer.getClass().equals(dummySerializerClassTag))) {
                                        // if the preceding serializer exists 
and is not a dummy, use
-                                       // that for converting instead of the 
provided convert deserializer
+                                       // that for converting instead of any 
provided convert deserializer
                                        return 
CompatibilityResult.requiresMigration((TypeSerializer<T>) precedingSerializer);
-                               } else if 
(initialResult.getConvertDeserializer() != null) {
-                                       return initialResult;
                                } else {
-                                       throw new RuntimeException(
-                                               "State migration required, but 
there is no available serializer capable of reading previous data.");
+                                       // requires migration (may or may not 
have a convert deserializer)
+                                       return initialResult;
                                }
                        }
                } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/f0f2e99b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 0f96dac..b16ac06 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -23,6 +23,8 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
@@ -36,6 +38,7 @@ import 
org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StateMigrationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -92,6 +95,26 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
         */
        private final boolean asynchronousSnapshots;
 
+       /**
+        * Map of state names to their corresponding restored state meta info.
+        *
+        * <p>TODO this map can be removed when eager-state registration is in 
place.
+        * TODO we currently need this cached to check state migration 
strategies when new serializers are registered.
+        */
+       private final Map<String, 
RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> restoredStateMetaInfos;
+
+       /**
+        * Cache of already accessed states.
+        *
+        * <p>In contrast to {@link #registeredStates} and {@link 
#restoredStateMetaInfos} which may be repopulated
+        * with restored state, this map is always empty at the beginning.
+        *
+        * <p>TODO this map should be moved to a base class once we have proper 
hierarchy for the operator state backends.
+        *
+        * @see <a 
href="https://issues.apache.org/jira/browse/FLINK-6849";>FLINK-6849</a>
+        */
+       private final HashMap<String, PartitionableListState<?>> 
accessedStatesByName;
+
        public DefaultOperatorStateBackend(
                ClassLoader userClassLoader,
                ExecutionConfig executionConfig,
@@ -103,6 +126,8 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                this.javaSerializer = new JavaSerializer<>();
                this.registeredStates = new HashMap<>();
                this.asynchronousSnapshots = asynchronousSnapshots;
+               this.accessedStatesByName = new HashMap<>();
+               this.restoredStateMetaInfos = new HashMap<>();
        }
 
        public ExecutionConfig getExecutionConfig() {
@@ -314,6 +339,8 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                                                        " not be loaded. This 
is a temporary restriction that will be fixed in future versions.");
                                        }
 
+                                       
restoredStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo);
+
                                        PartitionableListState<?> listState = 
registeredStates.get(restoredMetaInfo.getName());
 
                                        if (null == listState) {
@@ -359,7 +386,7 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                /**
                 * Meta information of the state, including state name, 
assignment mode, and serializer
                 */
-               private final RegisteredOperatorBackendStateMetaInfo<S> 
stateMetaInfo;
+               private RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo;
 
                /**
                 * The internal list the holds the elements of the state
@@ -389,12 +416,12 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                        this(toCopy.stateMetaInfo, 
toCopy.internalListCopySerializer.copy(toCopy.internalList));
                }
 
-               public RegisteredOperatorBackendStateMetaInfo<S> 
getStateMetaInfo() {
-                       return stateMetaInfo;
+               public void 
setStateMetaInfo(RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo) {
+                       this.stateMetaInfo = stateMetaInfo;
                }
 
-               public List<S> getInternalList() {
-                       return internalList;
+               public RegisteredOperatorBackendStateMetaInfo<S> 
getStateMetaInfo() {
+                       return stateMetaInfo;
                }
 
                public PartitionableListState<S> deepCopy() {
@@ -441,19 +468,32 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
        }
 
        private <S> ListState<S> getListState(
-               ListStateDescriptor<S> stateDescriptor,
-               OperatorStateHandle.Mode mode) throws IOException {
+                       ListStateDescriptor<S> stateDescriptor,
+                       OperatorStateHandle.Mode mode) throws IOException, 
StateMigrationException {
+
                Preconditions.checkNotNull(stateDescriptor);
+               String name = 
Preconditions.checkNotNull(stateDescriptor.getName());
 
-               
stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
+               @SuppressWarnings("unchecked")
+               PartitionableListState<S> previous = 
(PartitionableListState<S>) accessedStatesByName.get(name);
+               if (previous != null) {
+                       checkStateNameAndMode(previous.getStateMetaInfo(), 
name, mode);
+                       return previous;
+               }
 
-               String name = 
Preconditions.checkNotNull(stateDescriptor.getName());
+               // end up here if its the first time access after execution for 
the
+               // provided state name; check compatibility of restored state, 
if any
+               // TODO with eager registration in place, these checks should 
be moved to restore()
+
+               
stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
                TypeSerializer<S> partitionStateSerializer = 
Preconditions.checkNotNull(stateDescriptor.getElementSerializer());
 
                @SuppressWarnings("unchecked")
                PartitionableListState<S> partitionableListState = 
(PartitionableListState<S>) registeredStates.get(name);
 
                if (null == partitionableListState) {
+                       // no restored state for the state name; simply create 
new state holder
+
                        partitionableListState = new PartitionableListState<>(
                                new RegisteredOperatorBackendStateMetaInfo<>(
                                        name,
@@ -462,21 +502,38 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
 
                        registeredStates.put(name, partitionableListState);
                } else {
-                       // TODO with eager registration in place, these checks 
should be moved to restore()
-
-                       Preconditions.checkState(
-                               
partitionableListState.getStateMetaInfo().getName().equals(name),
-                               "Incompatible state names. " +
-                                       "Was [" + 
partitionableListState.getStateMetaInfo().getName() + "], " +
-                                       "registered with [" + name + "].");
-
-                       Preconditions.checkState(
-                               
partitionableListState.getStateMetaInfo().getAssignmentMode().equals(mode),
-                               "Incompatible state assignment modes. " +
-                                       "Was [" + 
partitionableListState.getStateMetaInfo().getAssignmentMode() + "], " +
-                                       "registered with [" + mode + "].");
+                       // has restored state; check compatibility of new state 
access
+
+                       
checkStateNameAndMode(partitionableListState.getStateMetaInfo(), name, mode);
+
+                       @SuppressWarnings("unchecked")
+                       RegisteredOperatorBackendStateMetaInfo.Snapshot<S> 
restoredMetaInfo =
+                               
(RegisteredOperatorBackendStateMetaInfo.Snapshot<S>) 
restoredStateMetaInfos.get(name);
+
+                       // check compatibility to determine if state migration 
is required
+                       CompatibilityResult<S> stateCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+                                       
restoredMetaInfo.getPartitionStateSerializer(),
+                                       UnloadableDummyTypeSerializer.class,
+                                       
restoredMetaInfo.getPartitionStateSerializerConfigSnapshot(),
+                                       partitionStateSerializer);
+
+                       if (!stateCompatibility.isRequiresMigration()) {
+                               // new serializer is compatible; use it to 
replace the old serializer
+                               partitionableListState.setStateMetaInfo(
+                                       new 
RegisteredOperatorBackendStateMetaInfo<>(name, partitionStateSerializer, mode));
+                       } else {
+                               // TODO state migration currently isn't 
possible.
+
+                               // NOTE: for heap backends, it is actually fine 
to proceed here without failing the restore,
+                               // since the state has already been 
deserialized to objects and we can just continue with
+                               // the new serializer; we're deliberately 
failing here for now to have equal functionality with
+                               // the RocksDB backend to avoid confusion for 
users.
+
+                               throw new StateMigrationException("State 
migration isn't supported, yet.");
+                       }
                }
 
+               accessedStatesByName.put(name, partitionableListState);
                return partitionableListState;
        }
 
@@ -497,4 +554,22 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                        }
                }
        }
+
+       private static void checkStateNameAndMode(
+                       RegisteredOperatorBackendStateMetaInfo previousMetaInfo,
+                       String expectedName,
+                       OperatorStateHandle.Mode expectedMode) {
+
+               Preconditions.checkState(
+                       previousMetaInfo.getName().equals(expectedName),
+                       "Incompatible state names. " +
+                               "Was [" + previousMetaInfo.getName() + "], " +
+                               "registered with [" + expectedName + "].");
+
+               Preconditions.checkState(
+                       
previousMetaInfo.getAssignmentMode().equals(expectedMode),
+                       "Incompatible state assignment modes. " +
+                               "Was [" + previousMetaInfo.getAssignmentMode() 
+ "], " +
+                               "registered with [" + expectedMode + "].");
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f0f2e99b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 2ab9691..35a70bb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
@@ -36,6 +37,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.migration.MigrationNamespaceSerializerProxy;
 import org.apache.flink.migration.MigrationUtil;
 import org.apache.flink.migration.runtime.state.KvStateSnapshot;
 import 
org.apache.flink.migration.runtime.state.memory.MigrationRestoreSnapshot;
@@ -98,6 +100,15 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        private final HashMap<String, StateTable<K, ?, ?>> stateTables = new 
HashMap<>();
 
        /**
+        * Map of state names to their corresponding restored state meta info.
+        *
+        * <p>
+        * TODO this map can be removed when eager-state registration is in 
place.
+        * TODO we currently need this cached to check state migration 
strategies when new serializers are registered.
+        */
+       private final Map<String, 
RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredKvStateMetaInfos;
+
+       /**
         * Determines whether or not we run snapshots asynchronously. This 
impacts the choice of the underlying
         * {@link StateTable} implementation.
         */
@@ -115,6 +126,8 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                super(kvStateRegistry, keySerializer, userCodeClassLoader, 
numberOfKeyGroups, keyGroupRange, executionConfig);
                this.asynchronousSnapshots = asynchronousSnapshots;
                LOG.info("Initializing heap keyed state backend with stream 
factory.");
+
+               this.restoredKvStateMetaInfos = new HashMap<>();
        }
 
        // 
------------------------------------------------------------------------
@@ -122,7 +135,7 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        // 
------------------------------------------------------------------------
 
        private <N, V> StateTable<K, N, V> tryRegisterStateTable(
-                       TypeSerializer<N> namespaceSerializer, 
StateDescriptor<?, V> stateDesc) {
+                       TypeSerializer<N> namespaceSerializer, 
StateDescriptor<?, V> stateDesc) throws StateMigrationException {
 
                return tryRegisterStateTable(
                                stateDesc.getName(), stateDesc.getType(),
@@ -133,7 +146,7 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        String stateName,
                        StateDescriptor.Type stateType,
                        TypeSerializer<N> namespaceSerializer,
-                       TypeSerializer<V> valueSerializer) {
+                       TypeSerializer<V> valueSerializer) throws 
StateMigrationException {
 
                final RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo =
                                new 
RegisteredKeyedBackendStateMetaInfo<>(stateType, stateName, 
namespaceSerializer, valueSerializer);
@@ -163,7 +176,36 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                                "registered with [" + 
newMetaInfo.getStateType() + "].");
                        }
 
-                       stateTable.setMetaInfo(newMetaInfo);
+                       @SuppressWarnings("unchecked")
+                       RegisteredKeyedBackendStateMetaInfo.Snapshot<N, V> 
restoredMetaInfo =
+                               
(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, V>) 
restoredKvStateMetaInfos.get(stateName);
+
+                       // check compatibility results to determine if state 
migration is required
+                       CompatibilityResult<N> namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+                                       
restoredMetaInfo.getNamespaceSerializer(),
+                                       MigrationNamespaceSerializerProxy.class,
+                                       
restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
+                                       newMetaInfo.getNamespaceSerializer());
+
+                       CompatibilityResult<V> stateCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+                                       restoredMetaInfo.getStateSerializer(),
+                                       UnloadableDummyTypeSerializer.class,
+                                       
restoredMetaInfo.getStateSerializerConfigSnapshot(),
+                                       newMetaInfo.getStateSerializer());
+
+                       if (!namespaceCompatibility.isRequiresMigration() && 
!stateCompatibility.isRequiresMigration()) {
+                               // new serializers are compatible; use them to 
replace the old serializers
+                               stateTable.setMetaInfo(newMetaInfo);
+                       } else {
+                               // TODO state migration currently isn't 
possible.
+
+                               // NOTE: for heap backends, it is actually fine 
to proceed here without failing the restore,
+                               // since the state has already been 
deserialized to objects and we can just continue with
+                               // the new serializer; we're deliberately 
failing here for now to have equal functionality with
+                               // the RocksDB backend to avoid confusion for 
users.
+
+                               throw new StateMigrationException("State 
migration isn't supported, yet.");
+                       }
                }
 
                return stateTable;
@@ -427,6 +469,8 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                                        " in future versions.");
                                        }
 
+                                       
restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo);
+
                                        StateTable<K, ?, ?> stateTable = 
stateTables.get(restoredMetaInfo.getName());
 
                                        //important: only create a new table we 
did not already create it previously
@@ -528,6 +572,10 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                MigrationRestoreSnapshot<K, ?, ?> stateSnapshot 
= (MigrationRestoreSnapshot<K, ?, ?>) genericSnapshot;
                                final StateTable rawResultMap =
                                                
stateSnapshot.deserialize(stateName, this);
+
+                               // mimic a restored kv state meta info
+                               restoredKvStateMetaInfos.put(stateName, 
rawResultMap.getMetaInfo().snapshot());
+
                                // add named state to the backend
                                stateTables.put(stateName, rawResultMap);
                        } else {

Reply via email to