[FLINK-6804] [state] Consistent state migration behaviour across state backends
Prior to this commit, memory and non-memory state backends behaved differently w.r.t. state migration. For the memory backends, we did not require the new serializer to be compatible in order for the job to proceed after restore, because all state have already been deserialized to objects and the new serializer can always just be used as is. Therefore, the compatibility checks were not performed for the memory backends, resulting in different code paths between the different state backends. However, this inconsistent behaviour across backends will be confusing for users. This commit adds the code path to check the newly registered serializer's compatibility in the memory backends (even though it isn't required), and deliberately fails the job if the new serializer is incompatible. Note that the compatibiilty code paths will be truly unified and required for all backends once we have eager state registration. This closes #4073. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f0f2e99b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f0f2e99b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f0f2e99b Branch: refs/heads/master Commit: f0f2e99b6c829c4f4e2ca47c7647a64fe0c9d808 Parents: ae285f9 Author: Tzu-Li (Gordon) Tai <[email protected]> Authored: Sun Jun 4 22:40:26 2017 +0200 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Tue Jun 13 06:38:35 2017 +0200 ---------------------------------------------------------------------- .../api/common/typeutils/CompatibilityUtil.java | 8 +- .../state/DefaultOperatorStateBackend.java | 119 +++++++++++++++---- .../state/heap/HeapKeyedStateBackend.java | 54 ++++++++- 3 files changed, 151 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f0f2e99b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java index 94bb9bd..df7f216 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java @@ -66,13 +66,11 @@ public class CompatibilityUtil { } else { if (precedingSerializer != null && !(precedingSerializer.getClass().equals(dummySerializerClassTag))) { // if the preceding serializer exists and is not a dummy, use - // that for converting instead of the provided convert deserializer + // that for converting instead of any provided convert deserializer return CompatibilityResult.requiresMigration((TypeSerializer<T>) precedingSerializer); - } else if (initialResult.getConvertDeserializer() != null) { - return initialResult; } else { - throw new RuntimeException( - "State migration required, but there is no available serializer capable of reading previous data."); + // requires migration (may or may not have a convert deserializer) + return initialResult; } } } else { http://git-wip-us.apache.org/repos/asf/flink/blob/f0f2e99b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java index 0f96dac..b16ac06 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java @@ -23,6 +23,8 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; import org.apache.flink.core.fs.CloseableRegistry; @@ -36,6 +38,7 @@ import org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StateMigrationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,6 +95,26 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { */ private final boolean asynchronousSnapshots; + /** + * Map of state names to their corresponding restored state meta info. + * + * <p>TODO this map can be removed when eager-state registration is in place. + * TODO we currently need this cached to check state migration strategies when new serializers are registered. + */ + private final Map<String, RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> restoredStateMetaInfos; + + /** + * Cache of already accessed states. + * + * <p>In contrast to {@link #registeredStates} and {@link #restoredStateMetaInfos} which may be repopulated + * with restored state, this map is always empty at the beginning. + * + * <p>TODO this map should be moved to a base class once we have proper hierarchy for the operator state backends. + * + * @see <a href="https://issues.apache.org/jira/browse/FLINK-6849">FLINK-6849</a> + */ + private final HashMap<String, PartitionableListState<?>> accessedStatesByName; + public DefaultOperatorStateBackend( ClassLoader userClassLoader, ExecutionConfig executionConfig, @@ -103,6 +126,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { this.javaSerializer = new JavaSerializer<>(); this.registeredStates = new HashMap<>(); this.asynchronousSnapshots = asynchronousSnapshots; + this.accessedStatesByName = new HashMap<>(); + this.restoredStateMetaInfos = new HashMap<>(); } public ExecutionConfig getExecutionConfig() { @@ -314,6 +339,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { " not be loaded. This is a temporary restriction that will be fixed in future versions."); } + restoredStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo); + PartitionableListState<?> listState = registeredStates.get(restoredMetaInfo.getName()); if (null == listState) { @@ -359,7 +386,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { /** * Meta information of the state, including state name, assignment mode, and serializer */ - private final RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo; + private RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo; /** * The internal list the holds the elements of the state @@ -389,12 +416,12 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { this(toCopy.stateMetaInfo, toCopy.internalListCopySerializer.copy(toCopy.internalList)); } - public RegisteredOperatorBackendStateMetaInfo<S> getStateMetaInfo() { - return stateMetaInfo; + public void setStateMetaInfo(RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo) { + this.stateMetaInfo = stateMetaInfo; } - public List<S> getInternalList() { - return internalList; + public RegisteredOperatorBackendStateMetaInfo<S> getStateMetaInfo() { + return stateMetaInfo; } public PartitionableListState<S> deepCopy() { @@ -441,19 +468,32 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { } private <S> ListState<S> getListState( - ListStateDescriptor<S> stateDescriptor, - OperatorStateHandle.Mode mode) throws IOException { + ListStateDescriptor<S> stateDescriptor, + OperatorStateHandle.Mode mode) throws IOException, StateMigrationException { + Preconditions.checkNotNull(stateDescriptor); + String name = Preconditions.checkNotNull(stateDescriptor.getName()); - stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig()); + @SuppressWarnings("unchecked") + PartitionableListState<S> previous = (PartitionableListState<S>) accessedStatesByName.get(name); + if (previous != null) { + checkStateNameAndMode(previous.getStateMetaInfo(), name, mode); + return previous; + } - String name = Preconditions.checkNotNull(stateDescriptor.getName()); + // end up here if its the first time access after execution for the + // provided state name; check compatibility of restored state, if any + // TODO with eager registration in place, these checks should be moved to restore() + + stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig()); TypeSerializer<S> partitionStateSerializer = Preconditions.checkNotNull(stateDescriptor.getElementSerializer()); @SuppressWarnings("unchecked") PartitionableListState<S> partitionableListState = (PartitionableListState<S>) registeredStates.get(name); if (null == partitionableListState) { + // no restored state for the state name; simply create new state holder + partitionableListState = new PartitionableListState<>( new RegisteredOperatorBackendStateMetaInfo<>( name, @@ -462,21 +502,38 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { registeredStates.put(name, partitionableListState); } else { - // TODO with eager registration in place, these checks should be moved to restore() - - Preconditions.checkState( - partitionableListState.getStateMetaInfo().getName().equals(name), - "Incompatible state names. " + - "Was [" + partitionableListState.getStateMetaInfo().getName() + "], " + - "registered with [" + name + "]."); - - Preconditions.checkState( - partitionableListState.getStateMetaInfo().getAssignmentMode().equals(mode), - "Incompatible state assignment modes. " + - "Was [" + partitionableListState.getStateMetaInfo().getAssignmentMode() + "], " + - "registered with [" + mode + "]."); + // has restored state; check compatibility of new state access + + checkStateNameAndMode(partitionableListState.getStateMetaInfo(), name, mode); + + @SuppressWarnings("unchecked") + RegisteredOperatorBackendStateMetaInfo.Snapshot<S> restoredMetaInfo = + (RegisteredOperatorBackendStateMetaInfo.Snapshot<S>) restoredStateMetaInfos.get(name); + + // check compatibility to determine if state migration is required + CompatibilityResult<S> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfo.getPartitionStateSerializer(), + UnloadableDummyTypeSerializer.class, + restoredMetaInfo.getPartitionStateSerializerConfigSnapshot(), + partitionStateSerializer); + + if (!stateCompatibility.isRequiresMigration()) { + // new serializer is compatible; use it to replace the old serializer + partitionableListState.setStateMetaInfo( + new RegisteredOperatorBackendStateMetaInfo<>(name, partitionStateSerializer, mode)); + } else { + // TODO state migration currently isn't possible. + + // NOTE: for heap backends, it is actually fine to proceed here without failing the restore, + // since the state has already been deserialized to objects and we can just continue with + // the new serializer; we're deliberately failing here for now to have equal functionality with + // the RocksDB backend to avoid confusion for users. + + throw new StateMigrationException("State migration isn't supported, yet."); + } } + accessedStatesByName.put(name, partitionableListState); return partitionableListState; } @@ -497,4 +554,22 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { } } } + + private static void checkStateNameAndMode( + RegisteredOperatorBackendStateMetaInfo previousMetaInfo, + String expectedName, + OperatorStateHandle.Mode expectedMode) { + + Preconditions.checkState( + previousMetaInfo.getName().equals(expectedName), + "Incompatible state names. " + + "Was [" + previousMetaInfo.getName() + "], " + + "registered with [" + expectedName + "]."); + + Preconditions.checkState( + previousMetaInfo.getAssignmentMode().equals(expectedMode), + "Incompatible state assignment modes. " + + "Was [" + previousMetaInfo.getAssignmentMode() + "], " + + "registered with [" + expectedMode + "]."); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/f0f2e99b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index 2ab9691..35a70bb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -29,6 +29,7 @@ import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; @@ -36,6 +37,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.migration.MigrationNamespaceSerializerProxy; import org.apache.flink.migration.MigrationUtil; import org.apache.flink.migration.runtime.state.KvStateSnapshot; import org.apache.flink.migration.runtime.state.memory.MigrationRestoreSnapshot; @@ -98,6 +100,15 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private final HashMap<String, StateTable<K, ?, ?>> stateTables = new HashMap<>(); /** + * Map of state names to their corresponding restored state meta info. + * + * <p> + * TODO this map can be removed when eager-state registration is in place. + * TODO we currently need this cached to check state migration strategies when new serializers are registered. + */ + private final Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredKvStateMetaInfos; + + /** * Determines whether or not we run snapshots asynchronously. This impacts the choice of the underlying * {@link StateTable} implementation. */ @@ -115,6 +126,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig); this.asynchronousSnapshots = asynchronousSnapshots; LOG.info("Initializing heap keyed state backend with stream factory."); + + this.restoredKvStateMetaInfos = new HashMap<>(); } // ------------------------------------------------------------------------ @@ -122,7 +135,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { // ------------------------------------------------------------------------ private <N, V> StateTable<K, N, V> tryRegisterStateTable( - TypeSerializer<N> namespaceSerializer, StateDescriptor<?, V> stateDesc) { + TypeSerializer<N> namespaceSerializer, StateDescriptor<?, V> stateDesc) throws StateMigrationException { return tryRegisterStateTable( stateDesc.getName(), stateDesc.getType(), @@ -133,7 +146,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { String stateName, StateDescriptor.Type stateType, TypeSerializer<N> namespaceSerializer, - TypeSerializer<V> valueSerializer) { + TypeSerializer<V> valueSerializer) throws StateMigrationException { final RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(stateType, stateName, namespaceSerializer, valueSerializer); @@ -163,7 +176,36 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { "registered with [" + newMetaInfo.getStateType() + "]."); } - stateTable.setMetaInfo(newMetaInfo); + @SuppressWarnings("unchecked") + RegisteredKeyedBackendStateMetaInfo.Snapshot<N, V> restoredMetaInfo = + (RegisteredKeyedBackendStateMetaInfo.Snapshot<N, V>) restoredKvStateMetaInfos.get(stateName); + + // check compatibility results to determine if state migration is required + CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfo.getNamespaceSerializer(), + MigrationNamespaceSerializerProxy.class, + restoredMetaInfo.getNamespaceSerializerConfigSnapshot(), + newMetaInfo.getNamespaceSerializer()); + + CompatibilityResult<V> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfo.getStateSerializer(), + UnloadableDummyTypeSerializer.class, + restoredMetaInfo.getStateSerializerConfigSnapshot(), + newMetaInfo.getStateSerializer()); + + if (!namespaceCompatibility.isRequiresMigration() && !stateCompatibility.isRequiresMigration()) { + // new serializers are compatible; use them to replace the old serializers + stateTable.setMetaInfo(newMetaInfo); + } else { + // TODO state migration currently isn't possible. + + // NOTE: for heap backends, it is actually fine to proceed here without failing the restore, + // since the state has already been deserialized to objects and we can just continue with + // the new serializer; we're deliberately failing here for now to have equal functionality with + // the RocksDB backend to avoid confusion for users. + + throw new StateMigrationException("State migration isn't supported, yet."); + } } return stateTable; @@ -427,6 +469,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { " in future versions."); } + restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo); + StateTable<K, ?, ?> stateTable = stateTables.get(restoredMetaInfo.getName()); //important: only create a new table we did not already create it previously @@ -528,6 +572,10 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { MigrationRestoreSnapshot<K, ?, ?> stateSnapshot = (MigrationRestoreSnapshot<K, ?, ?>) genericSnapshot; final StateTable rawResultMap = stateSnapshot.deserialize(stateName, this); + + // mimic a restored kv state meta info + restoredKvStateMetaInfos.put(stateName, rawResultMap.getMetaInfo().snapshot()); + // add named state to the backend stateTables.put(stateName, rawResultMap); } else {
