This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch release-1.7 in repository https://gitbox.apache.org/repos/asf/flink.git
commit ba346cade85511086c08c634cc34b1742d9d98c8 Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> AuthorDate: Sun Dec 9 15:15:04 2018 +0800 [FLINK-11094] [state backends] Let meta infos always lazily access restore serializer This commit introduces StateSerializerProvider that wraps logic on how to obtain serializers for registered state, either with the previous schema of state in checkpoints or the current schema of state. All state meta info subclasses use StateSerializerProviders to replace direct serializer instances. This allows meta infos that were instantiated with restored serializer snapshots to not eagerly access the restore serializer when restoring state. This needs to be avoided since when restoring from 1.6, the restore serializer might not be available; for RocksDB, this should be tolerable. --- .../RegisteredBroadcastStateBackendMetaInfo.java | 84 +++++-- .../RegisteredKeyValueStateBackendMetaInfo.java | 115 ++++++---- .../RegisteredOperatorStateBackendMetaInfo.java | 46 +++- ...egisteredPriorityQueueStateBackendMetaInfo.java | 39 +++- .../runtime/state/StateSerializerProvider.java | 245 +++++++++++++++++++++ .../flink/runtime/state/StateBackendTestBase.java | 51 +++++ .../runtime/state/StateSerializerProviderTest.java | 187 ++++++++++++++++ .../streaming/state/RocksDBKeyedStateBackend.java | 125 +++++------ 8 files changed, 739 insertions(+), 153 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java index 70a1414..95a650e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java @@ -19,11 +19,13 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; import org.apache.flink.util.Preconditions; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.util.Collections; import java.util.HashMap; @@ -38,11 +40,11 @@ public class RegisteredBroadcastStateBackendMetaInfo<K, V> extends RegisteredSta /** The type serializer for the keys in the map state. */ @Nonnull - private final TypeSerializer<K> keySerializer; + private final StateSerializerProvider<K> keySerializerProvider; /** The type serializer for the values in the map state. */ @Nonnull - private final TypeSerializer<V> valueSerializer; + private final StateSerializerProvider<V> valueSerializerProvider; public RegisteredBroadcastStateBackendMetaInfo( @Nonnull final String name, @@ -50,19 +52,19 @@ public class RegisteredBroadcastStateBackendMetaInfo<K, V> extends RegisteredSta @Nonnull final TypeSerializer<K> keySerializer, @Nonnull final TypeSerializer<V> valueSerializer) { - super(name); - Preconditions.checkArgument(assignmentMode == OperatorStateHandle.Mode.BROADCAST); - this.assignmentMode = assignmentMode; - this.keySerializer = keySerializer; - this.valueSerializer = valueSerializer; + this( + name, + assignmentMode, + StateSerializerProvider.fromNewState(keySerializer), + StateSerializerProvider.fromNewState(valueSerializer)); } public RegisteredBroadcastStateBackendMetaInfo(@Nonnull RegisteredBroadcastStateBackendMetaInfo<K, V> copy) { this( Preconditions.checkNotNull(copy).name, copy.assignmentMode, - copy.keySerializer.duplicate(), - copy.valueSerializer.duplicate()); + copy.getKeySerializer().duplicate(), + copy.getValueSerializer().duplicate()); } @SuppressWarnings("unchecked") @@ -71,10 +73,13 @@ public class RegisteredBroadcastStateBackendMetaInfo<K, V> extends RegisteredSta snapshot.getName(), OperatorStateHandle.Mode.valueOf( snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)), - (TypeSerializer<K>) Preconditions.checkNotNull( - snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER)), - (TypeSerializer<V>) Preconditions.checkNotNull( - snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))); + StateSerializerProvider.fromRestoredState( + (TypeSerializerSnapshot<K>) Preconditions.checkNotNull( + snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER))), + StateSerializerProvider.fromRestoredState( + (TypeSerializerSnapshot<V>) Preconditions.checkNotNull( + snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)))); + Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.BROADCAST == snapshot.getBackendStateType()); } @@ -86,6 +91,19 @@ public class RegisteredBroadcastStateBackendMetaInfo<K, V> extends RegisteredSta return new RegisteredBroadcastStateBackendMetaInfo<>(this); } + private RegisteredBroadcastStateBackendMetaInfo( + @Nonnull final String name, + @Nonnull final OperatorStateHandle.Mode assignmentMode, + @Nonnull final StateSerializerProvider<K> keySerializerProvider, + @Nonnull final StateSerializerProvider<V> valueSerializerProvider) { + + super(name); + Preconditions.checkArgument(assignmentMode == OperatorStateHandle.Mode.BROADCAST); + this.assignmentMode = assignmentMode; + this.keySerializerProvider = keySerializerProvider; + this.valueSerializerProvider = valueSerializerProvider; + } + @Nonnull @Override public StateMetaInfoSnapshot snapshot() { @@ -94,12 +112,32 @@ public class RegisteredBroadcastStateBackendMetaInfo<K, V> extends RegisteredSta @Nonnull public TypeSerializer<K> getKeySerializer() { - return keySerializer; + return keySerializerProvider.currentSchemaSerializer(); + } + + @Nonnull + public TypeSerializerSchemaCompatibility<K> updateKeySerializer(TypeSerializer<K> newKeySerializer) { + return keySerializerProvider.registerNewSerializerForRestoredState(newKeySerializer); + } + + @Nullable + public TypeSerializer<K> getPreviousKeySerializer() { + return keySerializerProvider.previousSchemaSerializer(); } @Nonnull public TypeSerializer<V> getValueSerializer() { - return valueSerializer; + return valueSerializerProvider.currentSchemaSerializer(); + } + + @Nonnull + public TypeSerializerSchemaCompatibility<V> updateValueSerializer(TypeSerializer<V> newValueSerializer) { + return valueSerializerProvider.registerNewSerializerForRestoredState(newValueSerializer); + } + + @Nullable + public TypeSerializer<V> getPreviousValueSerializer() { + return valueSerializerProvider.previousSchemaSerializer(); } @Nonnull @@ -122,16 +160,16 @@ public class RegisteredBroadcastStateBackendMetaInfo<K, V> extends RegisteredSta return Objects.equals(name, other.getName()) && Objects.equals(assignmentMode, other.getAssignmentMode()) - && Objects.equals(keySerializer, other.getKeySerializer()) - && Objects.equals(valueSerializer, other.getValueSerializer()); + && Objects.equals(getKeySerializer(), other.getKeySerializer()) + && Objects.equals(getValueSerializer(), other.getValueSerializer()); } @Override public int hashCode() { int result = name.hashCode(); result = 31 * result + assignmentMode.hashCode(); - result = 31 * result + keySerializer.hashCode(); - result = 31 * result + valueSerializer.hashCode(); + result = 31 * result + getKeySerializer().hashCode(); + result = 31 * result + getValueSerializer().hashCode(); return result; } @@ -139,8 +177,8 @@ public class RegisteredBroadcastStateBackendMetaInfo<K, V> extends RegisteredSta public String toString() { return "RegisteredBroadcastBackendStateMetaInfo{" + "name='" + name + '\'' + - ", keySerializer=" + keySerializer + - ", valueSerializer=" + valueSerializer + + ", keySerializer=" + getKeySerializer() + + ", valueSerializer=" + getValueSerializer() + ", assignmentMode=" + assignmentMode + '}'; } @@ -154,8 +192,12 @@ public class RegisteredBroadcastStateBackendMetaInfo<K, V> extends RegisteredSta Map<String, TypeSerializerSnapshot<?>> serializerConfigSnapshotsMap = new HashMap<>(2); String keySerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString(); String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(); + + TypeSerializer<K> keySerializer = getKeySerializer(); serializerMap.put(keySerializerKey, keySerializer.duplicate()); serializerConfigSnapshotsMap.put(keySerializerKey, keySerializer.snapshotConfiguration()); + + TypeSerializer<V> valueSerializer = getValueSerializer(); serializerMap.put(valueSerializerKey, valueSerializer.duplicate()); serializerConfigSnapshotsMap.put(valueSerializerKey, valueSerializer.snapshotConfiguration()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java index d05f31a..ebe8e94 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; import org.apache.flink.util.Preconditions; @@ -44,18 +45,24 @@ public class RegisteredKeyValueStateBackendMetaInfo<N, S> extends RegisteredStat @Nonnull private final StateDescriptor.Type stateType; @Nonnull - private final TypeSerializer<N> namespaceSerializer; + private final StateSerializerProvider<N> namespaceSerializerProvider; @Nonnull - private final TypeSerializer<S> stateSerializer; + private final StateSerializerProvider<S> stateSerializerProvider; @Nullable - private final StateSnapshotTransformer<S> snapshotTransformer; + private StateSnapshotTransformer<S> snapshotTransformer; public RegisteredKeyValueStateBackendMetaInfo( @Nonnull StateDescriptor.Type stateType, @Nonnull String name, @Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull TypeSerializer<S> stateSerializer) { - this(stateType, name, namespaceSerializer, stateSerializer, null); + + this( + stateType, + name, + StateSerializerProvider.fromNewState(namespaceSerializer), + StateSerializerProvider.fromNewState(stateSerializer), + null); } public RegisteredKeyValueStateBackendMetaInfo( @@ -65,11 +72,12 @@ public class RegisteredKeyValueStateBackendMetaInfo<N, S> extends RegisteredStat @Nonnull TypeSerializer<S> stateSerializer, @Nullable StateSnapshotTransformer<S> snapshotTransformer) { - super(name); - this.stateType = stateType; - this.namespaceSerializer = namespaceSerializer; - this.stateSerializer = stateSerializer; - this.snapshotTransformer = snapshotTransformer; + this( + stateType, + name, + StateSerializerProvider.fromNewState(namespaceSerializer), + StateSerializerProvider.fromNewState(stateSerializer), + snapshotTransformer); } @SuppressWarnings("unchecked") @@ -77,13 +85,31 @@ public class RegisteredKeyValueStateBackendMetaInfo<N, S> extends RegisteredStat this( StateDescriptor.Type.valueOf(snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)), snapshot.getName(), - (TypeSerializer<N>) Preconditions.checkNotNull( - snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER)), - (TypeSerializer<S>) Preconditions.checkNotNull( - snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)), null); + StateSerializerProvider.fromRestoredState( + (TypeSerializerSnapshot<N>) Preconditions.checkNotNull( + snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER))), + StateSerializerProvider.fromRestoredState( + (TypeSerializerSnapshot<S>) Preconditions.checkNotNull( + snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))), + null); + Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.KEY_VALUE == snapshot.getBackendStateType()); } + private RegisteredKeyValueStateBackendMetaInfo( + @Nonnull StateDescriptor.Type stateType, + @Nonnull String name, + @Nonnull StateSerializerProvider<N> namespaceSerializerProvider, + @Nonnull StateSerializerProvider<S> stateSerializerProvider, + @Nullable StateSnapshotTransformer<S> snapshotTransformer) { + + super(name); + this.stateType = stateType; + this.namespaceSerializerProvider = namespaceSerializerProvider; + this.stateSerializerProvider = stateSerializerProvider; + this.snapshotTransformer = snapshotTransformer; + } + @Nonnull public StateDescriptor.Type getStateType() { return stateType; @@ -91,12 +117,32 @@ public class RegisteredKeyValueStateBackendMetaInfo<N, S> extends RegisteredStat @Nonnull public TypeSerializer<N> getNamespaceSerializer() { - return namespaceSerializer; + return namespaceSerializerProvider.currentSchemaSerializer(); + } + + @Nonnull + public TypeSerializerSchemaCompatibility<N> updateNamespaceSerializer(TypeSerializer<N> newNamespaceSerializer) { + return namespaceSerializerProvider.registerNewSerializerForRestoredState(newNamespaceSerializer); + } + + @Nullable + public TypeSerializer<N> getPreviousNamespaceSerializer() { + return namespaceSerializerProvider.previousSchemaSerializer(); } @Nonnull public TypeSerializer<S> getStateSerializer() { - return stateSerializer; + return stateSerializerProvider.currentSchemaSerializer(); + } + + @Nonnull + public TypeSerializerSchemaCompatibility<S> updateStateSerializer(TypeSerializer<S> newStateSerializer) { + return stateSerializerProvider.registerNewSerializerForRestoredState(newStateSerializer); + } + + @Nullable + public TypeSerializer<S> getPreviousStateSerializer() { + return stateSerializerProvider.previousSchemaSerializer(); } @Nullable @@ -104,6 +150,10 @@ public class RegisteredKeyValueStateBackendMetaInfo<N, S> extends RegisteredStat return snapshotTransformer; } + public void updateSnapshotTransformer(StateSnapshotTransformer<S> snapshotTransformer) { + this.snapshotTransformer = snapshotTransformer; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -133,8 +183,8 @@ public class RegisteredKeyValueStateBackendMetaInfo<N, S> extends RegisteredStat return "RegisteredKeyedBackendStateMetaInfo{" + "stateType=" + stateType + ", name='" + name + '\'' + - ", namespaceSerializer=" + namespaceSerializer + - ", stateSerializer=" + stateSerializer + + ", namespaceSerializer=" + getNamespaceSerializer() + + ", stateSerializer=" + getStateSerializer() + '}'; } @@ -153,34 +203,19 @@ public class RegisteredKeyValueStateBackendMetaInfo<N, S> extends RegisteredStat return computeSnapshot(); } - public static void checkStateMetaInfo(StateMetaInfoSnapshot stateMetaInfoSnapshot, StateDescriptor<?, ?> stateDesc) { - Preconditions.checkState( - stateMetaInfoSnapshot != null, - "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + - " but its corresponding restored snapshot cannot be found."); - - Preconditions.checkState(stateMetaInfoSnapshot.getBackendStateType() - == StateMetaInfoSnapshot.BackendStateType.KEY_VALUE, - "Incompatible state types. " + - "Was [" + stateMetaInfoSnapshot.getBackendStateType() + "], " + - "registered as [" + StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "]."); + public void checkStateMetaInfo(StateDescriptor<?, ?> stateDesc) { Preconditions.checkState( - Objects.equals(stateDesc.getName(), stateMetaInfoSnapshot.getName()), + Objects.equals(stateDesc.getName(), getName()), "Incompatible state names. " + - "Was [" + stateMetaInfoSnapshot.getName() + "], " + + "Was [" + getName() + "], " + "registered with [" + stateDesc.getName() + "]."); - final StateDescriptor.Type restoredType = - StateDescriptor.Type.valueOf( - stateMetaInfoSnapshot.getOption( - StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)); - - if (stateDesc.getType() != StateDescriptor.Type.UNKNOWN && restoredType != StateDescriptor.Type.UNKNOWN) { + if (stateDesc.getType() != StateDescriptor.Type.UNKNOWN && getStateType() != StateDescriptor.Type.UNKNOWN) { Preconditions.checkState( - stateDesc.getType() == restoredType, + stateDesc.getType() == getStateType(), "Incompatible key/value state types. " + - "Was [" + restoredType + "], " + + "Was [" + getStateType() + "], " + "registered with [" + stateDesc.getType() + "]."); } } @@ -194,8 +229,12 @@ public class RegisteredKeyValueStateBackendMetaInfo<N, S> extends RegisteredStat Map<String, TypeSerializerSnapshot<?>> serializerConfigSnapshotsMap = new HashMap<>(2); String namespaceSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString(); String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(); + + TypeSerializer<N> namespaceSerializer = getNamespaceSerializer(); serializerMap.put(namespaceSerializerKey, namespaceSerializer.duplicate()); serializerConfigSnapshotsMap.put(namespaceSerializerKey, namespaceSerializer.snapshotConfiguration()); + + TypeSerializer<S> stateSerializer = getStateSerializer(); serializerMap.put(valueSerializerKey, stateSerializer.duplicate()); serializerConfigSnapshotsMap.put(valueSerializerKey, stateSerializer.snapshotConfiguration()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java index 10ba029..afb3d77 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java @@ -19,11 +19,13 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; import org.apache.flink.util.Preconditions; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.util.Collections; import java.util.Map; @@ -46,21 +48,22 @@ public class RegisteredOperatorStateBackendMetaInfo<S> extends RegisteredStateMe * The type serializer for the elements in the state list */ @Nonnull - private final TypeSerializer<S> partitionStateSerializer; + private final StateSerializerProvider<S> partitionStateSerializerProvider; public RegisteredOperatorStateBackendMetaInfo( @Nonnull String name, @Nonnull TypeSerializer<S> partitionStateSerializer, @Nonnull OperatorStateHandle.Mode assignmentMode) { - super(name); - this.partitionStateSerializer = partitionStateSerializer; - this.assignmentMode = assignmentMode; + this( + name, + StateSerializerProvider.fromNewState(partitionStateSerializer), + assignmentMode); } private RegisteredOperatorStateBackendMetaInfo(@Nonnull RegisteredOperatorStateBackendMetaInfo<S> copy) { this( Preconditions.checkNotNull(copy).name, - copy.partitionStateSerializer.duplicate(), + copy.getPartitionStateSerializer().duplicate(), copy.assignmentMode); } @@ -68,13 +71,24 @@ public class RegisteredOperatorStateBackendMetaInfo<S> extends RegisteredStateMe public RegisteredOperatorStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) { this( snapshot.getName(), - (TypeSerializer<S>) Preconditions.checkNotNull( - snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)), + StateSerializerProvider.fromRestoredState( + (TypeSerializerSnapshot<S>) Preconditions.checkNotNull( + snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))), OperatorStateHandle.Mode.valueOf( snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE))); + Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.OPERATOR == snapshot.getBackendStateType()); } + private RegisteredOperatorStateBackendMetaInfo( + @Nonnull String name, + @Nonnull StateSerializerProvider<S> partitionStateSerializerProvider, + @Nonnull OperatorStateHandle.Mode assignmentMode) { + super(name); + this.partitionStateSerializerProvider = partitionStateSerializerProvider; + this.assignmentMode = assignmentMode; + } + /** * Creates a deep copy of the itself. */ @@ -96,7 +110,17 @@ public class RegisteredOperatorStateBackendMetaInfo<S> extends RegisteredStateMe @Nonnull public TypeSerializer<S> getPartitionStateSerializer() { - return partitionStateSerializer; + return partitionStateSerializerProvider.currentSchemaSerializer(); + } + + @Nonnull + public TypeSerializerSchemaCompatibility<S> updatePartitionStateSerializer(TypeSerializer<S> newPartitionStateSerializer) { + return partitionStateSerializerProvider.registerNewSerializerForRestoredState(newPartitionStateSerializer); + } + + @Nullable + public TypeSerializer<S> getPreviousPartitionStateSerializer() { + return partitionStateSerializerProvider.previousSchemaSerializer(); } @Override @@ -112,7 +136,7 @@ public class RegisteredOperatorStateBackendMetaInfo<S> extends RegisteredStateMe return (obj instanceof RegisteredOperatorStateBackendMetaInfo) && name.equals(((RegisteredOperatorStateBackendMetaInfo) obj).getName()) && assignmentMode.equals(((RegisteredOperatorStateBackendMetaInfo) obj).getAssignmentMode()) - && partitionStateSerializer.equals(((RegisteredOperatorStateBackendMetaInfo) obj).getPartitionStateSerializer()); + && getPartitionStateSerializer().equals(((RegisteredOperatorStateBackendMetaInfo) obj).getPartitionStateSerializer()); } @Override @@ -128,7 +152,7 @@ public class RegisteredOperatorStateBackendMetaInfo<S> extends RegisteredStateMe return "RegisteredOperatorBackendStateMetaInfo{" + "name='" + name + "\'" + ", assignmentMode=" + assignmentMode + - ", partitionStateSerializer=" + partitionStateSerializer + + ", partitionStateSerializer=" + getPartitionStateSerializer() + '}'; } @@ -138,6 +162,8 @@ public class RegisteredOperatorStateBackendMetaInfo<S> extends RegisteredStateMe StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(), assignmentMode.toString()); String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(); + + TypeSerializer<S> partitionStateSerializer = getPartitionStateSerializer(); Map<String, TypeSerializer<?>> serializerMap = Collections.singletonMap(valueSerializerKey, partitionStateSerializer.duplicate()); Map<String, TypeSerializerSnapshot<?>> serializerConfigSnapshotsMap = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java index 0304b92..60c88e3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java @@ -19,11 +19,13 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; import org.apache.flink.util.Preconditions; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.util.Collections; import java.util.Map; @@ -34,24 +36,34 @@ import java.util.Map; public class RegisteredPriorityQueueStateBackendMetaInfo<T> extends RegisteredStateMetaInfoBase { @Nonnull - private final TypeSerializer<T> elementSerializer; + private final StateSerializerProvider<T> elementSerializerProvider; public RegisteredPriorityQueueStateBackendMetaInfo( @Nonnull String name, @Nonnull TypeSerializer<T> elementSerializer) { - super(name); - this.elementSerializer = elementSerializer; + this(name, StateSerializerProvider.fromNewState(elementSerializer)); } @SuppressWarnings("unchecked") public RegisteredPriorityQueueStateBackendMetaInfo(StateMetaInfoSnapshot snapshot) { - this(snapshot.getName(), - (TypeSerializer<T>) Preconditions.checkNotNull( - snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))); + this( + snapshot.getName(), + StateSerializerProvider.fromRestoredState( + (TypeSerializerSnapshot<T>) Preconditions.checkNotNull( + snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)))); + Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE == snapshot.getBackendStateType()); } + private RegisteredPriorityQueueStateBackendMetaInfo( + @Nonnull String name, + @Nonnull StateSerializerProvider<T> elementSerializerProvider) { + + super(name); + this.elementSerializerProvider = elementSerializerProvider; + } + @Nonnull @Override public StateMetaInfoSnapshot snapshot() { @@ -60,10 +72,21 @@ public class RegisteredPriorityQueueStateBackendMetaInfo<T> extends RegisteredSt @Nonnull public TypeSerializer<T> getElementSerializer() { - return elementSerializer; + return elementSerializerProvider.currentSchemaSerializer(); + } + + @Nonnull + public TypeSerializerSchemaCompatibility<T> updateElementSerializer(TypeSerializer<T> newElementSerializer) { + return elementSerializerProvider.registerNewSerializerForRestoredState(newElementSerializer); + } + + @Nullable + public TypeSerializer<T> getPreviousElementSerializer() { + return elementSerializerProvider.previousSchemaSerializer(); } private StateMetaInfoSnapshot computeSnapshot() { + TypeSerializer<T> elementSerializer = getElementSerializer(); Map<String, TypeSerializer<?>> serializerMap = Collections.singletonMap( StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(), @@ -82,6 +105,6 @@ public class RegisteredPriorityQueueStateBackendMetaInfo<T> extends RegisteredSt } public RegisteredPriorityQueueStateBackendMetaInfo deepCopy() { - return new RegisteredPriorityQueueStateBackendMetaInfo<>(name, elementSerializer.duplicate()); + return new RegisteredPriorityQueueStateBackendMetaInfo<>(name, getElementSerializer().duplicate()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java new file mode 100644 index 0000000..a24f12e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A {@link StateSerializerProvider} wraps logic on how to obtain serializers for registered state, + * either with the previous schema of state in checkpoints or the current schema of state. + * + * @param <T> the type of the state. + */ +@Internal +public abstract class StateSerializerProvider<T> { + + /** + * The registered serializer for the state. + * + * <p>In the case that this provider was created from a restored serializer snapshot via + * {@link #fromRestoredState(TypeSerializerSnapshot)}, but a new serializer was never registered + * for the state (i.e., this is the case if a restored state was never accessed), this would be {@code null}. + */ + @Nullable + TypeSerializer<T> registeredSerializer; + + /** + * Creates a {@link StateSerializerProvider} for restored state from the previous serializer's snapshot. + * + * <p>Once a new serializer is registered for the state, it should be provided via + * the {@link #registerNewSerializerForRestoredState(TypeSerializer)} method. + * + * @param stateSerializerSnapshot the previous serializer's snapshot. + * @param <T> the type of the state. + * + * @return a new {@link StateSerializerProvider} for restored state. + */ + public static <T> StateSerializerProvider<T> fromRestoredState(TypeSerializerSnapshot<T> stateSerializerSnapshot) { + return new RestoredStateSerializerProvider<>(stateSerializerSnapshot); + } + + /** + * Creates a {@link StateSerializerProvider} for new state from the registered state serializer. + * + * @param registeredStateSerializer the new state's registered serializer. + * @param <T> the type of the state. + * + * @return a new {@link StateSerializerProvider} for new state. + */ + public static <T> StateSerializerProvider<T> fromNewState(TypeSerializer<T> registeredStateSerializer) { + return new NewStateSerializerProvider<>(registeredStateSerializer); + } + + private StateSerializerProvider(@Nullable TypeSerializer<T> stateSerializer) { + this.registeredSerializer = stateSerializer; + } + + /** + * Gets the serializer that recognizes the current serialization schema of the state. + * This is the serializer that should be used for regular state serialization and + * deserialization after state has been restored. + * + * <p>If this provider was created from a restored state's serializer snapshot, while a + * new serializer (with a new schema) was not registered for the state (i.e., because + * the state was never accessed after it was restored), then the schema of state remains + * identical. Therefore, in this case, it is guaranteed that the serializer returned by + * this method is the same as the one returned by {@link #previousSchemaSerializer()}. + * + * <p>If this provider was created from new state, then this always returns the + * serializer that the new state was registered with. + * + * @return a serializer that reads and writes in the current schema of the state. + */ + @Nonnull + public abstract TypeSerializer<T> currentSchemaSerializer(); + + /** + * Gets the serializer that recognizes the previous serialization schema of the state. + * This is the serializer that should be used for restoring the state, i.e. when the state + * is still in the previous serialization schema. + * + * <p>This method can only be used if this provider was created from a restored state's serializer + * snapshot. If this provider was created from new state, then this method is + * irrelevant, since there doesn't exist any previous version of the state schema. + * + * @return a serializer that reads and writes in the previous schema of the state. + */ + @Nonnull + public abstract TypeSerializer<T> previousSchemaSerializer(); + + /** + * For restored state, register a new serializer that potentially has a new serialization schema. + * + * <p>Users are allowed to register serializers for state only once. Therefore, this method + * is irrelevant if this provider was created from new state, since a state serializer had + * been registered already. + * + * <p>For the case where this provider was created from restored state, then this method should + * be called at most once. The new serializer will be checked for its schema compatibility with the + * previous serializer's schema, and returned to the caller. The caller is responsible for + * checking the result and react appropriately to it, as follows: + * <ul> + * <li>{@link TypeSerializerSchemaCompatibility#isCompatibleAsIs()}: nothing needs to be done. + * {@link #currentSchemaSerializer()} now returns the newly registered serializer.</li> + * <li>{@link TypeSerializerSchemaCompatibility#isCompatibleAfterMigration()} ()}: state needs to be + * migrated before the serializer returned by {@link #currentSchemaSerializer()} can be used. + * The migration should be performed by reading the state with {@link #previousSchemaSerializer()}, + * and then writing it again with {@link #currentSchemaSerializer()}.</li> + * <li>{@link TypeSerializerSchemaCompatibility#isIncompatible()}: the registered serializer is + * incompatible. {@link #currentSchemaSerializer()} can no longer return a serializer for + * the state, and therefore this provider shouldn't be used anymore.</li> + * </ul> + * + * @return the schema compatibility of the new registered serializer, with respect to the previous serializer. + */ + @Nonnull + public abstract TypeSerializerSchemaCompatibility<T> registerNewSerializerForRestoredState(TypeSerializer<T> newSerializer); + + /** + * Implementation of the {@link StateSerializerProvider} for the restored state case. + */ + private static class RestoredStateSerializerProvider<T> extends StateSerializerProvider<T> { + + /** + * The snapshot of the previous serializer of the state. + */ + @Nonnull + private final TypeSerializerSnapshot<T> previousSerializerSnapshot; + + private boolean isRegisteredWithIncompatibleSerializer = false; + + RestoredStateSerializerProvider(TypeSerializerSnapshot<T> previousSerializerSnapshot) { + super(null); + this.previousSerializerSnapshot = Preconditions.checkNotNull(previousSerializerSnapshot); + } + + /** + * The restore serializer, lazily created only when the restore serializer is accessed. + * + * <p>NOTE: It is important to only create this lazily, so that off-heap + * state do not fail eagerly when restoring state that has a + * {@link UnloadableDummyTypeSerializer} as the previous serializer. This should + * be relevant only for restores from Flink versions prior to 1.7.x. + */ + @Nullable + private TypeSerializer<T> cachedRestoredSerializer; + + @Override + @Nonnull + public TypeSerializer<T> currentSchemaSerializer() { + if (registeredSerializer != null) { + checkState( + !isRegisteredWithIncompatibleSerializer, + "Unable to provide a serializer with the current schema, because the restored state was " + + "registered with a new serializer that has incompatible schema."); + + return registeredSerializer; + } + + // if we are not yet registered with a new serializer, + // we can just use the restore serializer to read / write the state. + return previousSchemaSerializer(); + } + + @Nonnull + public TypeSerializerSchemaCompatibility<T> registerNewSerializerForRestoredState(TypeSerializer<T> newSerializer) { + checkNotNull(newSerializer); + if (registeredSerializer != null) { + throw new UnsupportedOperationException("A serializer has already been registered for the state; re-registration is not allowed."); + } + + TypeSerializerSchemaCompatibility<T> result = previousSerializerSnapshot.resolveSchemaCompatibility(newSerializer); + if (result.isIncompatible()) { + this.isRegisteredWithIncompatibleSerializer = true; + } + this.registeredSerializer = newSerializer; + return result; + } + + @Nonnull + public final TypeSerializer<T> previousSchemaSerializer() { + if (cachedRestoredSerializer != null) { + return cachedRestoredSerializer; + } + + this.cachedRestoredSerializer = previousSerializerSnapshot.restoreSerializer(); + return cachedRestoredSerializer; + } + } + + /** + * Implementation of the {@link StateSerializerProvider} for the new state case. + */ + private static class NewStateSerializerProvider<T> extends StateSerializerProvider<T> { + + NewStateSerializerProvider(TypeSerializer<T> registeredStateSerializer) { + super(Preconditions.checkNotNull(registeredStateSerializer)); + } + + @Override + @Nonnull + @SuppressWarnings("ConstantConditions") + public TypeSerializer<T> currentSchemaSerializer() { + return registeredSerializer; + } + + @Override + @Nonnull + public TypeSerializerSchemaCompatibility<T> registerNewSerializerForRestoredState(TypeSerializer<T> newSerializer) { + throw new UnsupportedOperationException("A serializer has already been registered for the state; re-registration is not allowed."); + } + + @Override + @Nonnull + public TypeSerializer<T> previousSchemaSerializer() { + throw new UnsupportedOperationException("This is a NewStateSerializerProvider; you cannot get a restore serializer because there was no restored state."); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 54cac1e..8708613 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -3145,6 +3145,57 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten backend.dispose(); } + @Test + public void testSnapshotNonAccessedState() throws Exception { + CheckpointStreamFactory streamFactory = createStreamFactory(); + SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); + AbstractKeyedStateBackend<String> backend = createKeyedBackend(StringSerializer.INSTANCE); + + final String stateName = "test-name"; + try { + MapStateDescriptor<Integer, String> kvId = new MapStateDescriptor<>(stateName, Integer.class, String.class); + MapState<Integer, String> mapState = backend + .getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + + // write some state to be snapshotted + backend.setCurrentKey("1"); + mapState.put(11, "foo"); + backend.setCurrentKey("2"); + mapState.put(8, "bar"); + backend.setCurrentKey("3"); + mapState.put(91, "hello world"); + + // take a snapshot, and then restore backend with snapshot + KeyedStateHandle snapshot = runSnapshot( + backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), + sharedStateRegistry); + backend.dispose(); + + backend = restoreKeyedBackend(StringSerializer.INSTANCE, snapshot); + + // now take a snapshot again without accessing the state + snapshot = runSnapshot( + backend.snapshot(2L, 3L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), + sharedStateRegistry); + backend.dispose(); + + // we restore again and try to access previous state + backend = restoreKeyedBackend(StringSerializer.INSTANCE, snapshot); + mapState = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + + backend.setCurrentKey("1"); + assertEquals("foo", mapState.get(11)); + backend.setCurrentKey("2"); + assertEquals("bar", mapState.get(8)); + backend.setCurrentKey("3"); + assertEquals("hello world", mapState.get(91)); + + snapshot.discardState(); + } finally { + backend.dispose(); + } + } + /** * This test verifies that state is correctly assigned to key groups and that restore * restores the relevant key groups in the backend. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSerializerProviderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSerializerProviderTest.java new file mode 100644 index 0000000..de1f2bd --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSerializerProviderTest.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.testutils.statemigration.TestType; + +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Test suit for {@link StateSerializerProvider}. + */ +public class StateSerializerProviderTest { + + // -------------------------------------------------------------------------------- + // Tests for #currentSchemaSerializer() + // -------------------------------------------------------------------------------- + + @Test + public void testCurrentSchemaSerializerForNewStateSerializerProvider() { + StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromNewState(new TestType.V1TestTypeSerializer()); + assertTrue(testProvider.currentSchemaSerializer() instanceof TestType.V1TestTypeSerializer); + } + + @Test + public void testCurrentSchemaSerializerForRestoredStateSerializerProvider() { + TestType.V1TestTypeSerializer serializer = new TestType.V1TestTypeSerializer(); + StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration()); + assertTrue(testProvider.currentSchemaSerializer() instanceof TestType.V1TestTypeSerializer); + } + + // -------------------------------------------------------------------------------- + // Tests for #previousSchemaSerializer() + // -------------------------------------------------------------------------------- + + @Test(expected = UnsupportedOperationException.class) + public void testPreviousSchemaSerializerForNewStateSerializerProvider() { + StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromNewState(new TestType.V1TestTypeSerializer()); + + // this should fail with an exception + testProvider.previousSchemaSerializer(); + } + + @Test + public void testPreviousSchemaSerializerForRestoredStateSerializerProvider() { + TestType.V1TestTypeSerializer serializer = new TestType.V1TestTypeSerializer(); + StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration()); + assertTrue(testProvider.previousSchemaSerializer() instanceof TestType.V1TestTypeSerializer); + } + + @Test + public void testLazyInstantiationOfPreviousSchemaSerializer() { + // create the provider with an exception throwing snapshot; + // this would throw an exception if the restore serializer was eagerly accessed + StateSerializerProvider<String> testProvider = + StateSerializerProvider.fromRestoredState(new ExceptionThrowingSerializerSnapshot()); + + try { + // if we fail here, that means the restore serializer was indeed lazily accessed + testProvider.previousSchemaSerializer(); + fail("expected to fail when accessing the restore serializer."); + } catch (Exception expected) { + // success + } + } + + // -------------------------------------------------------------------------------- + // Tests for #registerNewSerializerForRestoredState(TypeSerializer) + // -------------------------------------------------------------------------------- + + @Test(expected = UnsupportedOperationException.class) + public void testRegisterNewSerializerWithNewStateSerializerProviderShouldFail() { + StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromNewState(new TestType.V1TestTypeSerializer()); + testProvider.registerNewSerializerForRestoredState(new TestType.V2TestTypeSerializer()); + } + + @Test(expected = UnsupportedOperationException.class) + public void testRegisterNewSerializerTwiceWithNewStateSerializerProviderShouldFail() { + TestType.V1TestTypeSerializer serializer = new TestType.V1TestTypeSerializer(); + StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration()); + + testProvider.registerNewSerializerForRestoredState(new TestType.V2TestTypeSerializer()); + + // second registration should fail + testProvider.registerNewSerializerForRestoredState(new TestType.V2TestTypeSerializer()); + } + + @Test + public void testRegisterNewCompatibleAsIsSerializer() { + TestType.V1TestTypeSerializer serializer = new TestType.V1TestTypeSerializer(); + StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration()); + + // register compatible serializer for state + TypeSerializerSchemaCompatibility<TestType> schemaCompatibility = + testProvider.registerNewSerializerForRestoredState(new TestType.V1TestTypeSerializer()); + assertTrue(schemaCompatibility.isCompatibleAsIs()); + + assertTrue(testProvider.currentSchemaSerializer() instanceof TestType.V1TestTypeSerializer); + assertTrue(testProvider.previousSchemaSerializer() instanceof TestType.V1TestTypeSerializer); + } + + @Test + public void testRegisterNewCompatibleAfterMigrationSerializer() { + TestType.V1TestTypeSerializer serializer = new TestType.V1TestTypeSerializer(); + StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration()); + + // register serializer that requires migration for state + TypeSerializerSchemaCompatibility<TestType> schemaCompatibility = + testProvider.registerNewSerializerForRestoredState(new TestType.V2TestTypeSerializer()); + assertTrue(schemaCompatibility.isCompatibleAfterMigration()); + } + + @Test + public void testRegisterIncompatibleSerializer() { + TestType.V1TestTypeSerializer serializer = new TestType.V1TestTypeSerializer(); + StateSerializerProvider<TestType> testProvider = StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration()); + + // register serializer that requires migration for state + TypeSerializerSchemaCompatibility<TestType> schemaCompatibility = + testProvider.registerNewSerializerForRestoredState(new TestType.IncompatibleTestTypeSerializer()); + assertTrue(schemaCompatibility.isIncompatible()); + + try { + // a serializer for the current schema will no longer be accessible + testProvider.currentSchemaSerializer(); + } catch (Exception excepted) { + // success + } + } + + // -------------------------------------------------------------------------------- + // Utilities + // -------------------------------------------------------------------------------- + + public static class ExceptionThrowingSerializerSnapshot implements TypeSerializerSnapshot<String> { + + @Override + public TypeSerializer<String> restoreSerializer() { + throw new UnsupportedOperationException(); + } + + @Override + public void writeSnapshot(DataOutputView out) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public TypeSerializerSchemaCompatibility<String> resolveSchemaCompatibility(TypeSerializer<String> newSerializer) { + throw new UnsupportedOperationException(); + } + + @Override + public int getCurrentVersion() { + throw new UnsupportedOperationException(); + } + } +} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index d12370b..065213b 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -29,7 +29,6 @@ import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; -import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; @@ -1377,78 +1376,69 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { TypeSerializer<N> namespaceSerializer, @Nullable StateSnapshotTransformer<SV> snapshotTransformer) throws Exception { - Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> stateInfo = + Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> oldStateInfo = kvStateInformation.get(stateDesc.getName()); TypeSerializer<SV> stateSerializer = stateDesc.getSerializer(); - RegisteredKeyValueStateBackendMetaInfo<N, SV> newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>( - stateDesc.getType(), - stateDesc.getName(), - namespaceSerializer, - stateSerializer, - snapshotTransformer); - - if (stateInfo != null) { - newMetaInfo = migrateStateIfNecessary( - newMetaInfo, + + ColumnFamilyHandle newColumnFamily; + RegisteredKeyValueStateBackendMetaInfo<N, SV> newMetaInfo; + if (oldStateInfo != null) { + @SuppressWarnings("unchecked") + RegisteredKeyValueStateBackendMetaInfo<N, SV> castedMetaInfo = (RegisteredKeyValueStateBackendMetaInfo<N, SV>) oldStateInfo.f1; + + newMetaInfo = updateRestoredStateMetaInfo( + Tuple2.of(oldStateInfo.f0, castedMetaInfo), stateDesc, namespaceSerializer, stateSerializer, - stateInfo); + snapshotTransformer); - stateInfo.f1 = newMetaInfo; + oldStateInfo.f1 = newMetaInfo; + newColumnFamily = oldStateInfo.f0; } else { - ColumnFamilyHandle columnFamily = createColumnFamily(stateDesc.getName()); + newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>( + stateDesc.getType(), + stateDesc.getName(), + namespaceSerializer, + stateSerializer, + snapshotTransformer); - stateInfo = Tuple2.of(columnFamily, newMetaInfo); - registerKvStateInformation(stateDesc.getName(), stateInfo); + newColumnFamily = createColumnFamily(stateDesc.getName()); + registerKvStateInformation(stateDesc.getName(), Tuple2.of(newColumnFamily, newMetaInfo)); } - return Tuple2.of(stateInfo.f0, newMetaInfo); + return Tuple2.of(newColumnFamily, newMetaInfo); } - private <N, S extends State, SV> RegisteredKeyValueStateBackendMetaInfo<N, SV> migrateStateIfNecessary( - RegisteredKeyValueStateBackendMetaInfo<N, SV> newMetaInfo, + private <N, S extends State, SV> RegisteredKeyValueStateBackendMetaInfo<N, SV> updateRestoredStateMetaInfo( + Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> oldStateInfo, StateDescriptor<S, SV> stateDesc, TypeSerializer<N> namespaceSerializer, TypeSerializer<SV> stateSerializer, - Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> stateInfo) throws Exception { - - StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName()); - - Preconditions.checkState( - restoredMetaInfoSnapshot != null, - "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + - " but its corresponding restored snapshot cannot be found."); + @Nullable StateSnapshotTransformer<SV> snapshotTransformer) throws Exception { @SuppressWarnings("unchecked") - TypeSerializerSnapshot<N> namespaceSerializerSnapshot = Preconditions.checkNotNull( - (TypeSerializerSnapshot<N>) restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot( - StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString())); + RegisteredKeyValueStateBackendMetaInfo<N, SV> restoredKvStateMetaInfo = oldStateInfo.f1; + + restoredKvStateMetaInfo.updateSnapshotTransformer(snapshotTransformer); - TypeSerializerSchemaCompatibility<N> namespaceCompatibility = - namespaceSerializerSnapshot.resolveSchemaCompatibility(namespaceSerializer); - if (!namespaceCompatibility.isCompatibleAsIs()) { + TypeSerializerSchemaCompatibility<N> s = restoredKvStateMetaInfo.updateNamespaceSerializer(namespaceSerializer); + if (!s.isCompatibleAsIs()) { throw new StateMigrationException("The new namespace serializer must be compatible."); } - @SuppressWarnings("unchecked") - TypeSerializerSnapshot<SV> stateSerializerSnapshot = Preconditions.checkNotNull( - (TypeSerializerSnapshot<SV>) restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot( - StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString())); - - RegisteredKeyValueStateBackendMetaInfo.checkStateMetaInfo(restoredMetaInfoSnapshot, stateDesc); + restoredKvStateMetaInfo.checkStateMetaInfo(stateDesc); - TypeSerializerSchemaCompatibility<SV> stateCompatibility = - stateSerializerSnapshot.resolveSchemaCompatibility(stateSerializer); - - if (stateCompatibility.isCompatibleAfterMigration()) { - migrateStateValues(stateDesc, stateInfo, restoredMetaInfoSnapshot, newMetaInfo, stateSerializer); - } else if (stateCompatibility.isIncompatible()) { + TypeSerializerSchemaCompatibility<SV> newStateSerializerCompatibility = + restoredKvStateMetaInfo.updateStateSerializer(stateSerializer); + if (newStateSerializerCompatibility.isCompatibleAfterMigration()) { + migrateStateValues(stateDesc, oldStateInfo); + } else if (newStateSerializerCompatibility.isIncompatible()) { throw new StateMigrationException("The new state serializer cannot be incompatible."); } - return newMetaInfo; + return restoredKvStateMetaInfo; } /** @@ -1458,10 +1448,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { */ private <N, S extends State, SV> void migrateStateValues( StateDescriptor<S, SV> stateDesc, - Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> stateInfo, - StateMetaInfoSnapshot restoredMetaInfoSnapshot, - RegisteredKeyValueStateBackendMetaInfo<N, SV> newMetaInfo, - TypeSerializer<SV> newStateSerializer) throws Exception { + Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> stateMetaInfo) throws Exception { if (stateDesc.getType() == StateDescriptor.Type.MAP) { throw new StateMigrationException("The new serializer for a MapState requires state migration in order for the job to proceed." + @@ -1483,7 +1470,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } State state = stateFactory.createState( stateDesc, - Tuple2.of(stateInfo.f0, newMetaInfo), + stateMetaInfo, RocksDBKeyedStateBackend.this); if (!(state instanceof AbstractRocksDBState)) { throw new FlinkRuntimeException( @@ -1495,16 +1482,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { Snapshot rocksDBSnapshot = db.getSnapshot(); try ( - RocksIteratorWrapper iterator = getRocksIterator(db, stateInfo.f0); + RocksIteratorWrapper iterator = getRocksIterator(db, stateMetaInfo.f0); RocksDBWriteBatchWrapper batchWriter = new RocksDBWriteBatchWrapper(db, getWriteOptions()) ) { iterator.seekToFirst(); - @SuppressWarnings("unchecked") - TypeSerializerSnapshot<SV> priorValueSerializerSnapshot = (TypeSerializerSnapshot<SV>) - Preconditions.checkNotNull(restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)); - TypeSerializer<SV> priorValueSerializer = priorValueSerializerSnapshot.restoreSerializer(); - DataInputDeserializer serializedValueInput = new DataInputDeserializer(); DataOutputSerializer migratedSerializedValueOutput = new DataOutputSerializer(512); while (iterator.isValid()) { @@ -1513,10 +1495,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { rocksDBState.migrateSerializedValue( serializedValueInput, migratedSerializedValueOutput, - priorValueSerializer, - newStateSerializer); + stateMetaInfo.f1.getPreviousStateSerializer(), + stateMetaInfo.f1.getStateSerializer()); - batchWriter.put(stateInfo.f0, iterator.key(), migratedSerializedValueOutput.getCopyOfBuffer()); + batchWriter.put(stateMetaInfo.f0, iterator.key(), migratedSerializedValueOutput.getCopyOfBuffer()); migratedSerializedValueOutput.clear(); iterator.next(); @@ -1710,25 +1692,16 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { // TODO we implement the simple way of supporting the current functionality, mimicking keyed state // because this should be reworked in FLINK-9376 and then we should have a common algorithm over // StateMetaInfoSnapshot that avoids this code duplication. - StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateName); - - Preconditions.checkState( - restoredMetaInfoSnapshot != null, - "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + - " but its corresponding restored snapshot cannot be found."); - StateMetaInfoSnapshot.CommonSerializerKeys serializerKey = - StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER; - - TypeSerializer<?> metaInfoTypeSerializer = restoredMetaInfoSnapshot.restoreTypeSerializer(serializerKey); + @SuppressWarnings("unchecked") + RegisteredPriorityQueueStateBackendMetaInfo<T> castedMetaInfo = + (RegisteredPriorityQueueStateBackendMetaInfo<T>) metaInfoTuple.f1; - if (metaInfoTypeSerializer != byteOrderedElementSerializer) { - @SuppressWarnings("unchecked") - TypeSerializerSnapshot<T> serializerSnapshot = Preconditions.checkNotNull( - (TypeSerializerSnapshot<T>) restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(serializerKey)); + TypeSerializer<T> previousElementSerializer = castedMetaInfo.getPreviousElementSerializer(); + if (previousElementSerializer != byteOrderedElementSerializer) { TypeSerializerSchemaCompatibility<T> compatibilityResult = - serializerSnapshot.resolveSchemaCompatibility(byteOrderedElementSerializer); + castedMetaInfo.updateElementSerializer(byteOrderedElementSerializer); // Since priority queue elements are written into RocksDB // as keys prefixed with the key group and namespace, we do not support