asfgit closed pull request #7329: [FLINK-11073] (part 1) Introduce
COMPATIBLE_WITH_RECONFIGURED_SERIALIZER option in
TypeSerializerSchemaCompatibility
URL: https://github.com/apache/flink/pull/7329
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSchemaCompatibility.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSchemaCompatibility.java
index d488799cfce..4bb4aa0d8a2 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSchemaCompatibility.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSchemaCompatibility.java
@@ -56,6 +56,12 @@
*/
COMPATIBLE_AFTER_MIGRATION,
+ /**
+ * This indicates that a reconfigured version of the new
serializer
+ * is compatible, and should be used instead of the original
new serializer.
+ */
+ COMPATIBLE_WITH_RECONFIGURED_SERIALIZER,
+
/**
* This indicates that the new serializer is incompatible, even
with migration.
* This normally implies that the deserialized Java class can
not be commonly recognized
@@ -69,6 +75,8 @@
*/
private final Type resultType;
+ private final TypeSerializer<T> reconfiguredNewSerializer;
+
/**
* Returns a result that indicates that the new serializer is
compatible and no migration is required.
* The new serializer can continued to be used as is.
@@ -89,6 +97,20 @@
return new
TypeSerializerSchemaCompatibility<>(Type.COMPATIBLE_AFTER_MIGRATION, null);
}
+ /**
+ * Returns a result that indicates a reconfigured version of the new
serializer is compatible, and should be
+ * used instead of the original new serializer.
+ *
+ * @param reconfiguredSerializer the reconfigured version of the new
serializer.
+ * @return a result that indicates a reconfigured version of the new
serializer is compatible, and should be
+ * used instead of the original new serializer.
+ */
+ public static <T> TypeSerializerSchemaCompatibility<T>
compatibleWithReconfiguredSerializer(TypeSerializer<T> reconfiguredSerializer) {
+ return new TypeSerializerSchemaCompatibility<>(
+ Type.COMPATIBLE_WITH_RECONFIGURED_SERIALIZER,
+ Preconditions.checkNotNull(reconfiguredSerializer));
+ }
+
/**
* Returns a result that indicates there is no possible way for the new
serializer to be use-able.
* This normally indicates that there is no common Java class between
what the previous bytes can be
@@ -105,6 +127,7 @@
private TypeSerializerSchemaCompatibility(Type resultType, @Nullable
TypeSerializer<T> reconfiguredNewSerializer) {
this.resultType = Preconditions.checkNotNull(resultType);
+ this.reconfiguredNewSerializer = reconfiguredNewSerializer;
}
/**
@@ -125,6 +148,27 @@ public boolean isCompatibleAfterMigration() {
return resultType == Type.COMPATIBLE_AFTER_MIGRATION;
}
+ /**
+ * Returns whether or not the type of the compatibility is {@link
Type#COMPATIBLE_WITH_RECONFIGURED_SERIALIZER}.
+ *
+ * @return whether or not the type of the compatibility is {@link
Type#COMPATIBLE_WITH_RECONFIGURED_SERIALIZER}.
+ */
+ public boolean isCompatibleWithReconfiguredSerializer() {
+ return resultType ==
Type.COMPATIBLE_WITH_RECONFIGURED_SERIALIZER;
+ }
+
+ /**
+ * Gets the reconfigured serializer. This throws an exception if
+ * {@link #isCompatibleWithReconfiguredSerializer()} is {@code false}.
+ */
+ public TypeSerializer<T> getReconfiguredSerializer() {
+ Preconditions.checkState(
+ isCompatibleWithReconfiguredSerializer(),
+ "It is only possible to get a reconfigured serializer
if the compatibility type is %s, but the type is %s",
+ Type.COMPATIBLE_WITH_RECONFIGURED_SERIALIZER,
resultType);
+ return reconfiguredNewSerializer;
+ }
+
/**
* Returns whether or not the type of the compatibility is {@link
Type#INCOMPATIBLE}.
*
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 19de2102a35..7422e654465 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -23,6 +23,8 @@
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -52,8 +54,8 @@
Closeable,
CheckpointListener {
- /** {@link TypeSerializer} for our key. */
- protected final TypeSerializer<K> keySerializer;
+ /** {@link StateSerializerProvider} for our key serializer. */
+ private final StateSerializerProvider<K> keySerializerProvider;
/** The currently active key. */
private K currentKey;
@@ -104,7 +106,7 @@ public AbstractKeyedStateBackend(
Preconditions.checkArgument(numberOfKeyGroups >=
keyGroupRange.getNumberOfKeyGroups(), "The total number of key groups must be
at least the number in the key group range assigned to this backend");
this.kvStateRegistry = kvStateRegistry;
- this.keySerializer = Preconditions.checkNotNull(keySerializer);
+ this.keySerializerProvider =
StateSerializerProvider.fromNewRegisteredSerializer(keySerializer);
this.numberOfKeyGroups = numberOfKeyGroups;
this.userCodeClassLoader =
Preconditions.checkNotNull(userCodeClassLoader);
this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
@@ -156,7 +158,13 @@ public void setCurrentKey(K newKey) {
*/
@Override
public TypeSerializer<K> getKeySerializer() {
- return keySerializer;
+ return keySerializerProvider.currentSchemaSerializer();
+ }
+
+ public TypeSerializerSchemaCompatibility<K>
checkKeySerializerSchemaCompatibility(
+ TypeSerializerSnapshot<K>
previousKeySerializerSnapshot) {
+
+ return
keySerializerProvider.setPreviousSerializerSnapshotForRestoredState(previousKeySerializerSnapshot);
}
/**
@@ -230,7 +238,7 @@ public KeyGroupRange getKeyGroupRange() {
final TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, V> stateDescriptor) throws Exception
{
checkNotNull(namespaceSerializer, "Namespace serializer");
- checkNotNull(keySerializer, "State key serializer has not been
configured in the config. " +
+ checkNotNull(keySerializerProvider, "State key serializer has
not been configured in the config. " +
"This operation cannot use partitioned state.");
InternalKvState<K, ?, ?> kvState =
keyValueStatesByName.get(stateDescriptor.getName());
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
index 585490dc75f..6de7c03e2cb 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
@@ -64,7 +64,7 @@
// TODO the keySerializer field should be removed, once all serializers
have the restoreSerializer() method implemented
private TypeSerializer<K> keySerializer;
- private TypeSerializerSnapshot<K> keySerializerConfigSnapshot;
+ private TypeSerializerSnapshot<K> keySerializerSnapshot;
private List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
@@ -82,7 +82,7 @@ public KeyedBackendSerializationProxy(
this.usingKeyGroupCompression = compression;
this.keySerializer = Preconditions.checkNotNull(keySerializer);
- this.keySerializerConfigSnapshot =
Preconditions.checkNotNull(keySerializer.snapshotConfiguration());
+ this.keySerializerSnapshot =
Preconditions.checkNotNull(keySerializer.snapshotConfiguration());
Preconditions.checkNotNull(stateMetaInfoSnapshots);
Preconditions.checkArgument(stateMetaInfoSnapshots.size() <=
Short.MAX_VALUE);
@@ -93,8 +93,8 @@ public KeyedBackendSerializationProxy(
return stateMetaInfoSnapshots;
}
- public TypeSerializerSnapshot<K> getKeySerializerConfigSnapshot() {
- return keySerializerConfigSnapshot;
+ public TypeSerializerSnapshot<K> getKeySerializerSnapshot() {
+ return keySerializerSnapshot;
}
public boolean isUsingKeyGroupCompression() {
@@ -118,7 +118,7 @@ public void write(DataOutputView out) throws IOException {
// write the compression format used to write each key-group
out.writeBoolean(usingKeyGroupCompression);
-
TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(out,
keySerializerConfigSnapshot, keySerializer);
+
TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(out,
keySerializerSnapshot, keySerializer);
// write individual registered keyed state metainfos
out.writeShort(stateMetaInfoSnapshots.size());
@@ -142,14 +142,14 @@ public void read(DataInputView in) throws IOException {
// only starting from version 3, we have the key serializer and
its config snapshot written
if (readVersion >= 6) {
- this.keySerializerConfigSnapshot =
TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
+ this.keySerializerSnapshot =
TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
in, userCodeClassLoader, null);
} else if (readVersion >= 3) {
Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>
keySerializerAndConfig =
TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in,
userCodeClassLoader).get(0);
- this.keySerializerConfigSnapshot =
(TypeSerializerSnapshot<K>) keySerializerAndConfig.f1;
+ this.keySerializerSnapshot =
(TypeSerializerSnapshot<K>) keySerializerAndConfig.f1;
} else {
- this.keySerializerConfigSnapshot = new
BackwardsCompatibleSerializerSnapshot<>(
+ this.keySerializerSnapshot = new
BackwardsCompatibleSerializerSnapshot<>(
TypeSerializerSerializationUtil.tryReadSerializer(in, userCodeClassLoader,
true));
}
this.keySerializer = null;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
index ecc13faa43d..e44559a2571 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
@@ -55,8 +55,8 @@ public RegisteredBroadcastStateBackendMetaInfo(
this(
name,
assignmentMode,
- StateSerializerProvider.fromNewState(keySerializer),
- StateSerializerProvider.fromNewState(valueSerializer));
+
StateSerializerProvider.fromNewRegisteredSerializer(keySerializer),
+
StateSerializerProvider.fromNewRegisteredSerializer(valueSerializer));
}
public RegisteredBroadcastStateBackendMetaInfo(@Nonnull
RegisteredBroadcastStateBackendMetaInfo<K, V> copy) {
@@ -73,10 +73,10 @@ public RegisteredBroadcastStateBackendMetaInfo(@Nonnull
StateMetaInfoSnapshot sn
snapshot.getName(),
OperatorStateHandle.Mode.valueOf(
snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)),
- StateSerializerProvider.fromRestoredState(
+ StateSerializerProvider.fromPreviousSerializerSnapshot(
(TypeSerializerSnapshot<K>)
Preconditions.checkNotNull(
snapshot.getTypeSerializerSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER))),
- StateSerializerProvider.fromRestoredState(
+ StateSerializerProvider.fromPreviousSerializerSnapshot(
(TypeSerializerSnapshot<V>)
Preconditions.checkNotNull(
snapshot.getTypeSerializerSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))));
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
index b37c79de026..b2d1cdc49ff 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
@@ -60,8 +60,8 @@ public RegisteredKeyValueStateBackendMetaInfo(
this(
stateType,
name,
-
StateSerializerProvider.fromNewState(namespaceSerializer),
- StateSerializerProvider.fromNewState(stateSerializer),
+
StateSerializerProvider.fromNewRegisteredSerializer(namespaceSerializer),
+
StateSerializerProvider.fromNewRegisteredSerializer(stateSerializer),
null);
}
@@ -75,8 +75,8 @@ public RegisteredKeyValueStateBackendMetaInfo(
this(
stateType,
name,
-
StateSerializerProvider.fromNewState(namespaceSerializer),
- StateSerializerProvider.fromNewState(stateSerializer),
+
StateSerializerProvider.fromNewRegisteredSerializer(namespaceSerializer),
+
StateSerializerProvider.fromNewRegisteredSerializer(stateSerializer),
snapshotTransformer);
}
@@ -85,10 +85,10 @@ public RegisteredKeyValueStateBackendMetaInfo(@Nonnull
StateMetaInfoSnapshot sna
this(
StateDescriptor.Type.valueOf(snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)),
snapshot.getName(),
- StateSerializerProvider.fromRestoredState(
+ StateSerializerProvider.fromPreviousSerializerSnapshot(
(TypeSerializerSnapshot<N>)
Preconditions.checkNotNull(
snapshot.getTypeSerializerSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER))),
- StateSerializerProvider.fromRestoredState(
+ StateSerializerProvider.fromPreviousSerializerSnapshot(
(TypeSerializerSnapshot<S>)
Preconditions.checkNotNull(
snapshot.getTypeSerializerSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))),
null);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
index 921947a4dd0..6b83ca2693d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
@@ -56,7 +56,7 @@ public RegisteredOperatorStateBackendMetaInfo(
@Nonnull OperatorStateHandle.Mode assignmentMode) {
this(
name,
-
StateSerializerProvider.fromNewState(partitionStateSerializer),
+
StateSerializerProvider.fromNewRegisteredSerializer(partitionStateSerializer),
assignmentMode);
}
@@ -71,7 +71,7 @@ private RegisteredOperatorStateBackendMetaInfo(@Nonnull
RegisteredOperatorStateB
public RegisteredOperatorStateBackendMetaInfo(@Nonnull
StateMetaInfoSnapshot snapshot) {
this(
snapshot.getName(),
- StateSerializerProvider.fromRestoredState(
+ StateSerializerProvider.fromPreviousSerializerSnapshot(
(TypeSerializerSnapshot<S>)
Preconditions.checkNotNull(
snapshot.getTypeSerializerSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))),
OperatorStateHandle.Mode.valueOf(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
index 961d96fa405..691e74c0288 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
@@ -42,14 +42,14 @@ public RegisteredPriorityQueueStateBackendMetaInfo(
@Nonnull String name,
@Nonnull TypeSerializer<T> elementSerializer) {
- this(name,
StateSerializerProvider.fromNewState(elementSerializer));
+ this(name,
StateSerializerProvider.fromNewRegisteredSerializer(elementSerializer));
}
@SuppressWarnings("unchecked")
public
RegisteredPriorityQueueStateBackendMetaInfo(StateMetaInfoSnapshot snapshot) {
this(
snapshot.getName(),
- StateSerializerProvider.fromRestoredState(
+ StateSerializerProvider.fromPreviousSerializerSnapshot(
(TypeSerializerSnapshot<T>)
Preconditions.checkNotNull(
snapshot.getTypeSerializerSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))));
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java
index a24f12e42fb..bad37ff8707 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java
@@ -35,6 +35,21 @@
* A {@link StateSerializerProvider} wraps logic on how to obtain serializers
for registered state,
* either with the previous schema of state in checkpoints or the current
schema of state.
*
+ * <p>A provider can be created from either a registered state serializer, or
the snapshot
+ * of the previous state serializer. For the former case, if the state was
restored and a
+ * snapshot of the previous state serializer was retrieved later on, the
snapshot can be set
+ * on the provider which also additionally checks the compatibility of the
initially registered
+ * serializer. Similarly for the latter case, if a new state serializer is
registered later on,
+ * it can be set on the provider, which then also checks the compatibility of
the new registered
+ * serializer.
+ *
+ * <p>Simply put, the provider works both directions - either creating it
first with a registered
+ * serializer or the previous serializer's snapshot, and then setting the
previous serializer's
+ * snapshot (if the provider was created with a registered serializer) or a
new registered state
+ * serializer (if the provider was created with a serializer snapshot). Either
way,
+ * the new registered serializer is checked for schema compatibility once both
the new serializer
+ * and the previous serializer snapshot is present.
+ *
* @param <T> the type of the state.
*/
@Internal
@@ -44,12 +59,36 @@
* The registered serializer for the state.
*
* <p>In the case that this provider was created from a restored
serializer snapshot via
- * {@link #fromRestoredState(TypeSerializerSnapshot)}, but a new
serializer was never registered
+ * {@link #fromPreviousSerializerSnapshot(TypeSerializerSnapshot)}, but
a new serializer was never registered
* for the state (i.e., this is the case if a restored state was never
accessed), this would be {@code null}.
*/
@Nullable
TypeSerializer<T> registeredSerializer;
+ /**
+ * The state's previous serializer's snapshot.
+ *
+ * <p>In the case that this provider was created from a registered
state serializer instance via
+ * {@link #fromNewRegisteredSerializer(TypeSerializer)}, but a
serializer snapshot was never supplied to this
+ * provider (i.e. because the registered serializer was for a new
state, not a restored one), this
+ * would be {@code null}.
+ */
+ @Nullable
+ TypeSerializerSnapshot<T> previousSerializerSnapshot;
+
+ /**
+ * The restore serializer, lazily created only when the restore
serializer is accessed.
+ *
+ * <p>NOTE: It is important to only create this lazily, so that off-heap
+ * state do not fail eagerly when restoring state that has a
+ * {@link UnloadableDummyTypeSerializer} as the previous serializer.
This should
+ * be relevant only for restores from Flink versions prior to 1.7.x.
+ */
+ @Nullable
+ private TypeSerializer<T> cachedRestoredSerializer;
+
+ private boolean isRegisteredWithIncompatibleSerializer = false;
+
/**
* Creates a {@link StateSerializerProvider} for restored state from
the previous serializer's snapshot.
*
@@ -59,26 +98,36 @@
* @param stateSerializerSnapshot the previous serializer's snapshot.
* @param <T> the type of the state.
*
- * @return a new {@link StateSerializerProvider} for restored state.
+ * @return a new {@link StateSerializerProvider}.
*/
- public static <T> StateSerializerProvider<T>
fromRestoredState(TypeSerializerSnapshot<T> stateSerializerSnapshot) {
- return new
RestoredStateSerializerProvider<>(stateSerializerSnapshot);
+ public static <T> StateSerializerProvider<T>
fromPreviousSerializerSnapshot(TypeSerializerSnapshot<T>
stateSerializerSnapshot) {
+ return new
LazilyRegisteredStateSerializerProvider<>(stateSerializerSnapshot);
}
/**
- * Creates a {@link StateSerializerProvider} for new state from the
registered state serializer.
+ * Creates a {@link StateSerializerProvider} from the registered state
serializer.
+ *
+ * <p>If the state is a restored one, and the previous serializer's
snapshot is
+ * obtained later on, is should be supplied via the
+ * {@link
#setPreviousSerializerSnapshotForRestoredState(TypeSerializerSnapshot)} method.
*
* @param registeredStateSerializer the new state's registered
serializer.
* @param <T> the type of the state.
*
- * @return a new {@link StateSerializerProvider} for new state.
+ * @return a new {@link StateSerializerProvider}.
*/
- public static <T> StateSerializerProvider<T>
fromNewState(TypeSerializer<T> registeredStateSerializer) {
- return new
NewStateSerializerProvider<>(registeredStateSerializer);
+ public static <T> StateSerializerProvider<T>
fromNewRegisteredSerializer(TypeSerializer<T> registeredStateSerializer) {
+ return new
EagerlyRegisteredStateSerializerProvider<>(registeredStateSerializer);
}
- private StateSerializerProvider(@Nullable TypeSerializer<T>
stateSerializer) {
+ private StateSerializerProvider(@Nonnull TypeSerializer<T>
stateSerializer) {
this.registeredSerializer = stateSerializer;
+ this.previousSerializerSnapshot = null;
+ }
+
+ private StateSerializerProvider(@Nonnull TypeSerializerSnapshot<T>
previousSerializerSnapshot) {
+ this.previousSerializerSnapshot = previousSerializerSnapshot;
+ this.registeredSerializer = null;
}
/**
@@ -92,36 +141,63 @@ private StateSerializerProvider(@Nullable
TypeSerializer<T> stateSerializer) {
* identical. Therefore, in this case, it is guaranteed that the
serializer returned by
* this method is the same as the one returned by {@link
#previousSchemaSerializer()}.
*
- * <p>If this provider was created from new state, then this always
returns the
- * serializer that the new state was registered with.
+ * <p>If this provider was created from a serializer instance, then
this always returns the
+ * that same serializer instance. If later on a snapshot of the
previous serializer is supplied
+ * via {@link
#setPreviousSerializerSnapshotForRestoredState(TypeSerializerSnapshot)}, then
+ * the initially supplied serializer instance will be checked for
compatibility.
*
* @return a serializer that reads and writes in the current schema of
the state.
*/
@Nonnull
- public abstract TypeSerializer<T> currentSchemaSerializer();
+ public final TypeSerializer<T> currentSchemaSerializer() {
+ if (registeredSerializer != null) {
+ checkState(
+ !isRegisteredWithIncompatibleSerializer,
+ "Unable to provide a serializer with the
current schema, because the restored state was " +
+ "registered with a new serializer that
has incompatible schema.");
+
+ return registeredSerializer;
+ }
+
+ // if we are not yet registered with a new serializer,
+ // we can just use the restore serializer to read / write the
state.
+ return previousSchemaSerializer();
+ };
/**
* Gets the serializer that recognizes the previous serialization
schema of the state.
* This is the serializer that should be used for restoring the state,
i.e. when the state
* is still in the previous serialization schema.
*
- * <p>This method can only be used if this provider was created from a
restored state's serializer
- * snapshot. If this provider was created from new state, then this
method is
- * irrelevant, since there doesn't exist any previous version of the
state schema.
+ * <p>This method only returns a serializer if this provider has the
previous serializer's
+ * snapshot. Otherwise, trying to access the previous schema serializer
will fail
+ * with an exception.
*
* @return a serializer that reads and writes in the previous schema of
the state.
*/
@Nonnull
- public abstract TypeSerializer<T> previousSchemaSerializer();
+ public final TypeSerializer<T> previousSchemaSerializer() {
+ if (cachedRestoredSerializer != null) {
+ return cachedRestoredSerializer;
+ }
+
+ if (previousSerializerSnapshot == null) {
+ throw new UnsupportedOperationException(
+ "This provider does not contain the state's
previous serializer's snapshot. Cannot provider a serializer for previous
schema.");
+ }
+
+ this.cachedRestoredSerializer =
previousSerializerSnapshot.restoreSerializer();
+ return cachedRestoredSerializer;
+ };
/**
* For restored state, register a new serializer that potentially has a
new serialization schema.
*
* <p>Users are allowed to register serializers for state only once.
Therefore, this method
- * is irrelevant if this provider was created from new state, since a
state serializer had
+ * is irrelevant if this provider was created with a serializer
instance, since a state serializer had
* been registered already.
*
- * <p>For the case where this provider was created from restored state,
then this method should
+ * <p>For the case where this provider was created from a serializer
snapshot, then this method should
* be called at most once. The new serializer will be checked for its
schema compatibility with the
* previous serializer's schema, and returned to the caller. The caller
is responsible for
* checking the result and react appropriately to it, as follows:
@@ -143,52 +219,61 @@ private StateSerializerProvider(@Nullable
TypeSerializer<T> stateSerializer) {
public abstract TypeSerializerSchemaCompatibility<T>
registerNewSerializerForRestoredState(TypeSerializer<T> newSerializer);
/**
- * Implementation of the {@link StateSerializerProvider} for the
restored state case.
+ * For restored state, set the state's previous serializer's snapshot.
+ *
+ * <p>Users are allowed to set the previous serializer's snapshot once.
Therefore, this method
+ * is irrelevant if this provider was created with a serializer
snapshot, since the serializer
+ * snapshot had been set already.
+ *
+ * <p>For the case where this provider was created from a serializer
instance, then this method should
+ * be called at most once. The initially registered state serializer
will be checked for its
+ * schema compatibility with the previous serializer's schema, and
returned to the caller.
+ * The caller is responsible for checking the result and react
appropriately to it, as follows:
+ * <ul>
+ * <li>{@link
TypeSerializerSchemaCompatibility#isCompatibleAsIs()}: nothing needs to be done.
+ * {@link #currentSchemaSerializer()} remains to return the
initially registered serializer.</li>
+ * <li>{@link
TypeSerializerSchemaCompatibility#isCompatibleAfterMigration()} ()}: state
needs to be
+ * migrated before the serializer returned by {@link
#currentSchemaSerializer()} can be used.
+ * The migration should be performed by reading the state with
{@link #previousSchemaSerializer()},
+ * and then writing it again with {@link
#currentSchemaSerializer()}.</li>
+ * <li>{@link TypeSerializerSchemaCompatibility#isIncompatible()}:
the registered serializer is
+ * incompatible. {@link #currentSchemaSerializer()} can no longer
return a serializer for
+ * the state, and therefore this provider shouldn't be used
anymore.</li>
+ * </ul>
+ *
+ * @param previousSerializerSnapshot the state's previous serializer's
snapshot
+ *
+ * @return the schema compatibility of the initially registered
serializer, with respect to the previous serializer.
*/
- private static class RestoredStateSerializerProvider<T> extends
StateSerializerProvider<T> {
-
- /**
- * The snapshot of the previous serializer of the state.
- */
- @Nonnull
- private final TypeSerializerSnapshot<T>
previousSerializerSnapshot;
-
- private boolean isRegisteredWithIncompatibleSerializer = false;
-
- RestoredStateSerializerProvider(TypeSerializerSnapshot<T>
previousSerializerSnapshot) {
- super(null);
- this.previousSerializerSnapshot =
Preconditions.checkNotNull(previousSerializerSnapshot);
- }
-
- /**
- * The restore serializer, lazily created only when the restore
serializer is accessed.
- *
- * <p>NOTE: It is important to only create this lazily, so that
off-heap
- * state do not fail eagerly when restoring state that has a
- * {@link UnloadableDummyTypeSerializer} as the previous
serializer. This should
- * be relevant only for restores from Flink versions prior to
1.7.x.
- */
- @Nullable
- private TypeSerializer<T> cachedRestoredSerializer;
+ @Nonnull
+ public abstract TypeSerializerSchemaCompatibility<T>
setPreviousSerializerSnapshotForRestoredState(TypeSerializerSnapshot<T>
previousSerializerSnapshot);
- @Override
- @Nonnull
- public TypeSerializer<T> currentSchemaSerializer() {
- if (registeredSerializer != null) {
- checkState(
- !isRegisteredWithIncompatibleSerializer,
- "Unable to provide a serializer with
the current schema, because the restored state was " +
- "registered with a new
serializer that has incompatible schema.");
+ /**
+ * Invalidates access to the current schema serializer. This lets
{@link #currentSchemaSerializer()}
+ * fail when invoked.
+ *
+ * <p>Access to the current schema serializer should be invalidated by
the methods
+ * {@link #registerNewSerializerForRestoredState(TypeSerializer)} or
+ * {@link
#setPreviousSerializerSnapshotForRestoredState(TypeSerializerSnapshot)}
+ * once the registered serializer is determined to be incompatible.
+ */
+ protected final void invalidateCurrentSchemaSerializerAccess() {
+ this.isRegisteredWithIncompatibleSerializer = true;
+ }
- return registeredSerializer;
- }
+ /**
+ * Implementation of the {@link StateSerializerProvider} for the case
where a snapshot of the
+ * previous serializer is obtained before a new state serializer is
registered (hence, the naming "lazily" registered).
+ */
+ private static class LazilyRegisteredStateSerializerProvider<T> extends
StateSerializerProvider<T> {
- // if we are not yet registered with a new serializer,
- // we can just use the restore serializer to read /
write the state.
- return previousSchemaSerializer();
+
LazilyRegisteredStateSerializerProvider(TypeSerializerSnapshot<T>
previousSerializerSnapshot) {
+
super(Preconditions.checkNotNull(previousSerializerSnapshot));
}
@Nonnull
+ @Override
+ @SuppressWarnings("ConstantConditions")
public TypeSerializerSchemaCompatibility<T>
registerNewSerializerForRestoredState(TypeSerializer<T> newSerializer) {
checkNotNull(newSerializer);
if (registeredSerializer != null) {
@@ -197,49 +282,59 @@ private StateSerializerProvider(@Nullable
TypeSerializer<T> stateSerializer) {
TypeSerializerSchemaCompatibility<T> result =
previousSerializerSnapshot.resolveSchemaCompatibility(newSerializer);
if (result.isIncompatible()) {
- this.isRegisteredWithIncompatibleSerializer =
true;
+ invalidateCurrentSchemaSerializerAccess();
+ }
+ if (result.isCompatibleWithReconfiguredSerializer()) {
+ this.registeredSerializer =
result.getReconfiguredSerializer();
+ } else {
+ this.registeredSerializer = newSerializer;
}
- this.registeredSerializer = newSerializer;
return result;
}
@Nonnull
- public final TypeSerializer<T> previousSchemaSerializer() {
- if (cachedRestoredSerializer != null) {
- return cachedRestoredSerializer;
- }
-
- this.cachedRestoredSerializer =
previousSerializerSnapshot.restoreSerializer();
- return cachedRestoredSerializer;
+ @Override
+ public TypeSerializerSchemaCompatibility<T>
setPreviousSerializerSnapshotForRestoredState(
+ TypeSerializerSnapshot<T>
previousSerializerSnapshot) {
+ throw new UnsupportedOperationException("The snapshot
of the state's previous serializer has already been set; cannot reset.");
}
}
/**
- * Implementation of the {@link StateSerializerProvider} for the new
state case.
+ * Implementation of the {@link StateSerializerProvider} for the case
where a new state
+ * serializer instance is registered first, before any snapshots of the
previous state serializer
+ * is obtained (hence, the naming "eagerly" registered).
*/
- private static class NewStateSerializerProvider<T> extends
StateSerializerProvider<T> {
+ private static class EagerlyRegisteredStateSerializerProvider<T>
extends StateSerializerProvider<T> {
- NewStateSerializerProvider(TypeSerializer<T>
registeredStateSerializer) {
+ EagerlyRegisteredStateSerializerProvider(TypeSerializer<T>
registeredStateSerializer) {
super(Preconditions.checkNotNull(registeredStateSerializer));
}
- @Override
@Nonnull
- @SuppressWarnings("ConstantConditions")
- public TypeSerializer<T> currentSchemaSerializer() {
- return registeredSerializer;
- }
-
@Override
- @Nonnull
public TypeSerializerSchemaCompatibility<T>
registerNewSerializerForRestoredState(TypeSerializer<T> newSerializer) {
throw new UnsupportedOperationException("A serializer
has already been registered for the state; re-registration is not allowed.");
}
- @Override
@Nonnull
- public TypeSerializer<T> previousSchemaSerializer() {
- throw new UnsupportedOperationException("This is a
NewStateSerializerProvider; you cannot get a restore serializer because there
was no restored state.");
+ @Override
+ public TypeSerializerSchemaCompatibility<T>
setPreviousSerializerSnapshotForRestoredState(TypeSerializerSnapshot<T>
previousSerializerSnapshot) {
+ checkNotNull(previousSerializerSnapshot);
+ if (this.previousSerializerSnapshot != null) {
+ throw new UnsupportedOperationException("The
snapshot of the state's previous serializer has already been set; cannot
reset.");
+ }
+
+ this.previousSerializerSnapshot =
previousSerializerSnapshot;
+
+ TypeSerializerSchemaCompatibility<T> result =
previousSerializerSnapshot.resolveSchemaCompatibility(registeredSerializer);
+ if (result.isIncompatible()) {
+ invalidateCurrentSchemaSerializerAccess();
+ }
+ if (result.isCompatibleWithReconfiguredSerializer()) {
+ this.registeredSerializer =
result.getReconfiguredSerializer();
+ }
+ return result;
}
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 3f8761b657a..2499898220c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -243,7 +243,7 @@ public HeapKeyedStateBackend(
TypeSerializerSchemaCompatibility<N>
namespaceCompatibility =
restoredKvMetaInfo.updateNamespaceSerializer(namespaceSerializer);
- if (!namespaceCompatibility.isCompatibleAsIs()) {
+ if (namespaceCompatibility.isCompatibleAfterMigration()
|| namespaceCompatibility.isIncompatible()) {
throw new StateMigrationException("For heap
backends, the new namespace serializer must be compatible.");
}
@@ -302,7 +302,7 @@ private boolean hasRegisteredState() {
}
StateTable<K, N, SV> stateTable = tryRegisterStateTable(
namespaceSerializer, stateDesc,
getStateSnapshotTransformer(stateDesc, snapshotTransformFactory));
- return stateFactory.createState(stateDesc, stateTable,
keySerializer);
+ return stateFactory.createState(stateDesc, stateTable,
getKeySerializer());
}
@SuppressWarnings("unchecked")
@@ -394,8 +394,9 @@ private void
restorePartitionedState(Collection<KeyedStateHandle> state) throws
if (!keySerializerRestored) {
// check for key serializer
compatibility; this also reconfigures the
// key serializer to be compatible, if
it is required and is possible
- if
(!serializationProxy.getKeySerializerConfigSnapshot()
-
.resolveSchemaCompatibility(keySerializer).isCompatibleAsIs()) {
+ TypeSerializerSchemaCompatibility<K>
keySerializerSchemaCompat =
+
checkKeySerializerSchemaCompatibility(serializationProxy.getKeySerializerSnapshot());
+ if
(keySerializerSchemaCompat.isCompatibleAfterMigration() ||
keySerializerSchemaCompat.isIncompatible()) {
throw new
StateMigrationException("The new key serializer must be compatible.");
}
@@ -700,7 +701,7 @@ public boolean isAsynchronous() {
new KeyedBackendSerializationProxy<>(
// TODO: this code assumes that writing
a serializer is threadsafe, we should support to
// get a serialized form already at
state registration time in the future
- keySerializer,
+ getKeySerializer(),
metaInfoSnapshots,
!Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE,
keyGroupCompressionDecorator));
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
index 55aacb23057..7e497be3555 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
@@ -75,7 +75,7 @@ public void testKeyedBackendSerializationProxyRoundtrip()
throws Exception {
}
Assert.assertTrue(serializationProxy.isUsingKeyGroupCompression());
-
Assert.assertTrue(serializationProxy.getKeySerializerConfigSnapshot()
instanceof IntSerializer.IntSerializerSnapshot);
+ Assert.assertTrue(serializationProxy.getKeySerializerSnapshot()
instanceof IntSerializer.IntSerializerSnapshot);
assertEqualStateMetaInfoSnapshotsLists(stateMetaInfoList,
serializationProxy.getStateMetaInfoSnapshots());
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
index 5511792673e..8d85e74aca7 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
@@ -40,6 +40,7 @@
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -69,23 +70,70 @@
private CheckpointStorageLocation checkpointStorageLocation;
//
-------------------------------------------------------------------------------
- // Keyed state backend migration tests
+ // Tests for keyed ValueState
//
-------------------------------------------------------------------------------
@Test
public void testKeyedValueStateMigration() throws Exception {
+ final String stateName = "test-name";
+
+ testKeyedValueStateUpgrade(
+ new ValueStateDescriptor<>(
+ stateName,
+ new TestType.V1TestTypeSerializer()),
+ new ValueStateDescriptor<>(
+ stateName,
+ // restore with a V2 serializer that has a
different schema
+ new TestType.V2TestTypeSerializer()));
+ }
+
+ @Test
+ public void testKeyedValueStateSerializerReconfiguration() throws
Exception {
+ final String stateName = "test-name";
+
+ testKeyedValueStateUpgrade(
+ new ValueStateDescriptor<>(
+ stateName,
+ new TestType.V1TestTypeSerializer()),
+ new ValueStateDescriptor<>(
+ stateName,
+ // the test fails if this serializer is used
instead of a reconfigured new serializer
+ new
TestType.ReconfigurationRequiringTestTypeSerializer()));
+ }
+
+ @Test
+ public void
testKeyedValueStateRegistrationFailsIfNewStateSerializerIsIncompatible() throws
Exception {
+ final String stateName = "test-name";
+
+ try {
+ testKeyedValueStateUpgrade(
+ new ValueStateDescriptor<>(
+ stateName,
+ new TestType.V1TestTypeSerializer()),
+ new ValueStateDescriptor<>(
+ stateName,
+ new
TestType.IncompatibleTestTypeSerializer()));
+
+ Assert.fail("should have failed");
+ } catch (Exception expected) {
+
Assert.assertTrue(ExceptionUtils.findThrowable(expected,
StateMigrationException.class).isPresent());
+ }
+ }
+
+ private void testKeyedValueStateUpgrade(
+ ValueStateDescriptor<TestType> initialAccessDescriptor,
+ ValueStateDescriptor<TestType>
newAccessDescriptorAfterRestore) throws Exception {
+
CheckpointStreamFactory streamFactory = createStreamFactory();
SharedStateRegistry sharedStateRegistry = new
SharedStateRegistry();
- final String stateName = "test-name";
AbstractKeyedStateBackend<Integer> backend =
createKeyedBackend(IntSerializer.INSTANCE);
try {
- ValueStateDescriptor<TestType> kvId = new
ValueStateDescriptor<>(
- stateName,
- new TestType.V1TestTypeSerializer());
- ValueState<TestType> valueState = backend
- .getPartitionedState(VoidNamespace.INSTANCE,
CustomVoidNamespaceSerializer.INSTANCE, kvId);
+ ValueState<TestType> valueState =
backend.getPartitionedState(
+ VoidNamespace.INSTANCE,
+ CustomVoidNamespaceSerializer.INSTANCE,
+ initialAccessDescriptor);
backend.setCurrentKey(1);
valueState.update(new TestType("foo", 1456));
@@ -101,16 +149,13 @@ public void testKeyedValueStateMigration() throws
Exception {
backend = restoreKeyedBackend(IntSerializer.INSTANCE,
snapshot);
- // the new serializer is V2, and has a completely new
serialization schema.
- kvId = new ValueStateDescriptor<>(
- stateName,
- new TestType.V2TestTypeSerializer());
- valueState = backend
- .getPartitionedState(VoidNamespace.INSTANCE,
CustomVoidNamespaceSerializer.INSTANCE, kvId);
+ valueState = backend.getPartitionedState(
+ VoidNamespace.INSTANCE,
+ CustomVoidNamespaceSerializer.INSTANCE,
+ newAccessDescriptorAfterRestore);
snapshot.discardState();
- // the state backend should have decided whether or not
it needs to perform state migration;
// make sure that reading and writing each key state
works with the new serializer
backend.setCurrentKey(1);
Assert.assertEquals(new TestType("foo", 1456),
valueState.value());
@@ -128,20 +173,72 @@ public void testKeyedValueStateMigration() throws
Exception {
}
}
+ //
-------------------------------------------------------------------------------
+ // Tests for keyed ListState
+ //
-------------------------------------------------------------------------------
+
@Test
public void testKeyedListStateMigration() throws Exception {
+ final String stateName = "test-name";
+
+ testKeyedListStateUpgrade(
+ new ListStateDescriptor<>(
+ stateName,
+ new TestType.V1TestTypeSerializer()),
+ new ListStateDescriptor<>(
+ stateName,
+ // restore with a V2 serializer that has a
different schema
+ new TestType.V2TestTypeSerializer()));
+ }
+
+ @Test
+ @Ignore("This currently doesn't pass because the ListSerializer doesn't
respect the reconfigured case, yet.")
+ public void testKeyedListStateSerializerReconfiguration() throws
Exception {
+ final String stateName = "test-name";
+
+ testKeyedListStateUpgrade(
+ new ListStateDescriptor<>(
+ stateName,
+ new TestType.V1TestTypeSerializer()),
+ new ListStateDescriptor<>(
+ stateName,
+ // the test fails if this serializer is used
instead of a reconfigured new serializer
+ new
TestType.ReconfigurationRequiringTestTypeSerializer()));
+ }
+
+ @Test
+ public void
testKeyedListStateRegistrationFailsIfNewStateSerializerIsIncompatible() throws
Exception {
+ final String stateName = "test-name";
+
+ try {
+ testKeyedListStateUpgrade(
+ new ListStateDescriptor<>(
+ stateName,
+ new TestType.V1TestTypeSerializer()),
+ new ListStateDescriptor<>(
+ stateName,
+ new
TestType.IncompatibleTestTypeSerializer()));
+
+ Assert.fail("should have failed");
+ } catch (Exception expected) {
+
Assert.assertTrue(ExceptionUtils.findThrowable(expected,
StateMigrationException.class).isPresent());
+ }
+ }
+
+ private void testKeyedListStateUpgrade(
+ ListStateDescriptor<TestType> initialAccessDescriptor,
+ ListStateDescriptor<TestType>
newAccessDescriptorAfterRestore) throws Exception {
+
CheckpointStreamFactory streamFactory = createStreamFactory();
SharedStateRegistry sharedStateRegistry = new
SharedStateRegistry();
- final String stateName = "test-name";
AbstractKeyedStateBackend<Integer> backend =
createKeyedBackend(IntSerializer.INSTANCE);
try {
- ListStateDescriptor<TestType> kvId = new
ListStateDescriptor<>(
- stateName,
- new TestType.V1TestTypeSerializer());
- ListState<TestType> listState = backend
- .getPartitionedState(VoidNamespace.INSTANCE,
CustomVoidNamespaceSerializer.INSTANCE, kvId);
+ ListState<TestType> listState =
backend.getPartitionedState(
+ VoidNamespace.INSTANCE,
+ CustomVoidNamespaceSerializer.INSTANCE,
+ initialAccessDescriptor);
backend.setCurrentKey(1);
listState.add(new TestType("key-1", 1));
@@ -162,16 +259,13 @@ public void testKeyedListStateMigration() throws
Exception {
backend = restoreKeyedBackend(IntSerializer.INSTANCE,
snapshot);
- // the new serializer is V2, and has a completely new
serialization schema.
- kvId = new ListStateDescriptor<>(
- stateName,
- new TestType.V2TestTypeSerializer());
- listState = backend
- .getPartitionedState(VoidNamespace.INSTANCE,
CustomVoidNamespaceSerializer.INSTANCE, kvId);
+ listState = backend.getPartitionedState(
+ VoidNamespace.INSTANCE,
+ CustomVoidNamespaceSerializer.INSTANCE,
+ newAccessDescriptorAfterRestore);
snapshot.discardState();
- // the state backend should have decided whether or not
it needs to perform state migration;
// make sure that reading and writing each key state
works with the new serializer
backend.setCurrentKey(1);
Iterator<TestType> iterable1 =
listState.get().iterator();
@@ -198,27 +292,24 @@ public void testKeyedListStateMigration() throws
Exception {
}
}
+ //
-------------------------------------------------------------------------------
+ // Tests for keyed priority queue state
+ //
-------------------------------------------------------------------------------
+
@Test
- public void
testKeyedValueStateRegistrationFailsIfNewStateSerializerIsIncompatible() throws
Exception {
+ public void
testPriorityQueueStateCreationFailsIfNewSerializerIsNotCompatible() throws
Exception {
CheckpointStreamFactory streamFactory = createStreamFactory();
SharedStateRegistry sharedStateRegistry = new
SharedStateRegistry();
- final String stateName = "test-name";
AbstractKeyedStateBackend<Integer> backend =
createKeyedBackend(IntSerializer.INSTANCE);
try {
- ValueStateDescriptor<TestType> kvId = new
ValueStateDescriptor<>(
- stateName,
- new TestType.V1TestTypeSerializer());
- ValueState<TestType> valueState = backend
- .getPartitionedState(VoidNamespace.INSTANCE,
CustomVoidNamespaceSerializer.INSTANCE, kvId);
+ InternalPriorityQueue<TestType> internalPriorityQueue =
backend.create(
+ "testPriorityQueue", new
TestType.V1TestTypeSerializer());
- backend.setCurrentKey(1);
- valueState.update(new TestType("foo", 1456));
- backend.setCurrentKey(2);
- valueState.update(new TestType("bar", 478));
- backend.setCurrentKey(3);
- valueState.update(new TestType("hello", 189));
+ internalPriorityQueue.add(new TestType("key-1", 123));
+ internalPriorityQueue.add(new TestType("key-2", 346));
+ internalPriorityQueue.add(new TestType("key-1", 777));
KeyedStateHandle snapshot = runSnapshot(
backend.snapshot(1L, 2L, streamFactory,
CheckpointOptions.forCheckpointWithDefaultLocation()),
@@ -226,110 +317,64 @@ public void
testKeyedValueStateRegistrationFailsIfNewStateSerializerIsIncompatib
backend.dispose();
backend = restoreKeyedBackend(IntSerializer.INSTANCE,
snapshot);
-
- kvId = new ValueStateDescriptor<>(
- stateName,
- new TestType.IncompatibleTestTypeSerializer());
-
- // the new serializer is INCOMPATIBLE, so registering
the state should fail
- backend.getPartitionedState(VoidNamespace.INSTANCE,
CustomVoidNamespaceSerializer.INSTANCE, kvId);
+ backend.create(
+ "testPriorityQueue", new
TestType.IncompatibleTestTypeSerializer());
Assert.fail("should have failed");
} catch (Exception e) {
Assert.assertTrue(ExceptionUtils.findThrowable(e,
StateMigrationException.class).isPresent());
- }finally {
+ } finally {
backend.dispose();
}
}
- @Test
- public void
testKeyedListStateRegistrationFailsIfNewStateSerializerIsIncompatible() throws
Exception {
- CheckpointStreamFactory streamFactory = createStreamFactory();
- SharedStateRegistry sharedStateRegistry = new
SharedStateRegistry();
-
- final String stateName = "test-name";
- AbstractKeyedStateBackend<Integer> backend =
createKeyedBackend(IntSerializer.INSTANCE);
+ //
-------------------------------------------------------------------------------
+ // Tests for key serializer in keyed state backends
+ //
-------------------------------------------------------------------------------
+ @Test
+ public void
testStateBackendRestoreFailsIfNewKeySerializerRequiresMigration() throws
Exception {
try {
- ListStateDescriptor<TestType> kvId = new
ListStateDescriptor<>(
- stateName,
- new TestType.V1TestTypeSerializer());
- ListState<TestType> listState = backend
- .getPartitionedState(VoidNamespace.INSTANCE,
CustomVoidNamespaceSerializer.INSTANCE, kvId);
-
- backend.setCurrentKey(1);
- listState.add(new TestType("key-1", 1));
- listState.add(new TestType("key-1", 2));
- listState.add(new TestType("key-1", 3));
-
- backend.setCurrentKey(2);
- listState.add(new TestType("key-2", 1));
-
- backend.setCurrentKey(3);
- listState.add(new TestType("key-3", 1));
- listState.add(new TestType("key-3", 2));
-
- KeyedStateHandle snapshot = runSnapshot(
- backend.snapshot(1L, 2L, streamFactory,
CheckpointOptions.forCheckpointWithDefaultLocation()),
- sharedStateRegistry);
- backend.dispose();
-
- backend = restoreKeyedBackend(IntSerializer.INSTANCE,
snapshot);
-
- kvId = new ListStateDescriptor<>(
- stateName,
- new TestType.IncompatibleTestTypeSerializer());
-
- // the new serializer is INCOMPATIBLE, so registering
the state should fail
- backend.getPartitionedState(VoidNamespace.INSTANCE,
CustomVoidNamespaceSerializer.INSTANCE, kvId);
+ testKeySerializerUpgrade(
+ new TestType.V1TestTypeSerializer(),
+ new TestType.V2TestTypeSerializer());
Assert.fail("should have failed");
- } catch (Exception e) {
- Assert.assertTrue(ExceptionUtils.findThrowable(e,
StateMigrationException.class).isPresent());
- } finally {
- backend.dispose();
+ } catch (Exception expected) {
+ // the new key serializer requires migration; this
should fail the restore
+
Assert.assertTrue(ExceptionUtils.findThrowable(expected,
StateMigrationException.class).isPresent());
}
}
@Test
- public void
testPriorityQueueStateCreationFailsIfNewSerializerIsNotCompatible() throws
Exception {
- CheckpointStreamFactory streamFactory = createStreamFactory();
- SharedStateRegistry sharedStateRegistry = new
SharedStateRegistry();
-
- AbstractKeyedStateBackend<Integer> backend =
createKeyedBackend(IntSerializer.INSTANCE);
+ public void
testStateBackendRestoreSucceedsIfNewKeySerializerRequiresReconfiguration()
throws Exception {
+ testKeySerializerUpgrade(
+ new TestType.V1TestTypeSerializer(),
+ new
TestType.ReconfigurationRequiringTestTypeSerializer());
+ }
+ @Test
+ public void
testStateBackendRestoreFailsIfNewKeySerializerIsIncompatible() throws Exception
{
try {
- InternalPriorityQueue<TestType> internalPriorityQueue =
backend.create(
- "testPriorityQueue", new
TestType.V1TestTypeSerializer());
-
- internalPriorityQueue.add(new TestType("key-1", 123));
- internalPriorityQueue.add(new TestType("key-2", 346));
- internalPriorityQueue.add(new TestType("key-1", 777));
-
- KeyedStateHandle snapshot = runSnapshot(
- backend.snapshot(1L, 2L, streamFactory,
CheckpointOptions.forCheckpointWithDefaultLocation()),
- sharedStateRegistry);
- backend.dispose();
-
- backend = restoreKeyedBackend(IntSerializer.INSTANCE,
snapshot);
- backend.create(
- "testPriorityQueue", new
TestType.IncompatibleTestTypeSerializer());
+ testKeySerializerUpgrade(
+ new TestType.V1TestTypeSerializer(),
+ new TestType.IncompatibleTestTypeSerializer());
Assert.fail("should have failed");
- } catch (Exception e) {
- Assert.assertTrue(ExceptionUtils.findThrowable(e,
StateMigrationException.class).isPresent());
- } finally {
- backend.dispose();
+ } catch (Exception expected) {
+ // the new key serializer is incompatible; this should
fail the restore
+
Assert.assertTrue(ExceptionUtils.findThrowable(expected,
StateMigrationException.class).isPresent());
}
}
- @Test
- public void
testStateBackendCreationFailsIfNewKeySerializerIsNotCompatible() throws
Exception {
+ private void testKeySerializerUpgrade(
+ TypeSerializer<TestType> initialKeySerializer,
+ TypeSerializer<TestType> newKeySerializer) throws
Exception {
+
CheckpointStreamFactory streamFactory = createStreamFactory();
SharedStateRegistry sharedStateRegistry = new
SharedStateRegistry();
- AbstractKeyedStateBackend<TestType> backend =
createKeyedBackend(
- new TestType.V1TestTypeSerializer());
+ AbstractKeyedStateBackend<TestType> backend =
createKeyedBackend(initialKeySerializer);
final String stateName = "test-name";
try {
@@ -347,30 +392,66 @@ public void
testStateBackendCreationFailsIfNewKeySerializerIsNotCompatible() thr
sharedStateRegistry);
backend.dispose();
- try {
- // the new key serializer is incompatible; this
should fail the restore
- restoreKeyedBackend(new
TestType.IncompatibleTestTypeSerializer(), snapshot);
+ backend = restoreKeyedBackend(newKeySerializer,
snapshot);
- Assert.fail("should have failed");
- } catch (Exception e) {
-
Assert.assertTrue(ExceptionUtils.findThrowable(e,
StateMigrationException.class).isPresent());
- }
+ valueState = backend
+ .getPartitionedState(VoidNamespace.INSTANCE,
CustomVoidNamespaceSerializer.INSTANCE, kvId);
- try {
- // the new key serializer requires migration;
this should fail the restore
- restoreKeyedBackend(new
TestType.V2TestTypeSerializer(), snapshot);
+ // access and check previous state
+ backend.setCurrentKey(new TestType("foo", 123));
+ Assert.assertEquals(1, valueState.value().intValue());
+ backend.setCurrentKey(new TestType("bar", 456));
+ Assert.assertEquals(5, valueState.value().intValue());
- Assert.fail("should have failed");
- } catch (Exception e) {
-
Assert.assertTrue(ExceptionUtils.findThrowable(e,
StateMigrationException.class).isPresent());
- }
+ snapshot.discardState();
} finally {
backend.dispose();
}
}
+ //
-------------------------------------------------------------------------------
+ // Tests for namespace serializer in keyed state backends
+ //
-------------------------------------------------------------------------------
+
@Test
- public void
testKeyedStateRegistrationFailsIfNewNamespaceSerializerIsNotCompatible() throws
Exception {
+ public void
testKeyedStateRegistrationFailsIfNewNamespaceSerializerRequiresMigration()
throws Exception {
+ try {
+ testNamespaceSerializerUpgrade(
+ new TestType.V1TestTypeSerializer(),
+ new TestType.V2TestTypeSerializer());
+
+ Assert.fail("should have failed");
+ } catch (Exception expected) {
+ // the new namespace serializer requires migration;
this should fail the restore
+
Assert.assertTrue(ExceptionUtils.findThrowable(expected,
StateMigrationException.class).isPresent());
+ }
+ }
+
+ @Test
+ public void
testKeyedStateRegistrationSucceedsIfNewNamespaceSerializerRequiresReconfiguration()
throws Exception {
+ testNamespaceSerializerUpgrade(
+ new TestType.V1TestTypeSerializer(),
+ new
TestType.ReconfigurationRequiringTestTypeSerializer());
+ }
+
+ @Test
+ public void
testKeyedStateRegistrationFailsIfNewNamespaceSerializerIsIncompatible() throws
Exception {
+ try {
+ testNamespaceSerializerUpgrade(
+ new TestType.V1TestTypeSerializer(),
+ new TestType.IncompatibleTestTypeSerializer());
+
+ Assert.fail("should have failed");
+ } catch (Exception expected) {
+ // the new namespace serializer is incompatible; this
should fail the restore
+
Assert.assertTrue(ExceptionUtils.findThrowable(expected,
StateMigrationException.class).isPresent());
+ }
+ }
+
+ private void testNamespaceSerializerUpgrade(
+ TypeSerializer<TestType> initialNamespaceSerializer,
+ TypeSerializer<TestType>
newNamespaceSerializerAfterRestore) throws Exception {
+
CheckpointStreamFactory streamFactory = createStreamFactory();
SharedStateRegistry sharedStateRegistry = new
SharedStateRegistry();
@@ -382,7 +463,7 @@ public void
testKeyedStateRegistrationFailsIfNewNamespaceSerializerIsNotCompatib
ValueState<Integer> valueState = backend
.getPartitionedState(
new TestType("namespace", 123),
- new TestType.V1TestTypeSerializer(),
+ initialNamespaceSerializer,
kvId);
backend.setCurrentKey(1);
@@ -397,53 +478,87 @@ public void
testKeyedStateRegistrationFailsIfNewNamespaceSerializerIsNotCompatib
// test incompatible namespace serializer; start with a
freshly restored backend
backend.dispose();
backend = restoreKeyedBackend(IntSerializer.INSTANCE,
snapshot);
- try {
- // the new namespace serializer is
incompatible; this should fail the restore
- backend.getPartitionedState(
- new TestType("namespace", 123),
- new
TestType.IncompatibleTestTypeSerializer(),
- kvId);
- Assert.fail("should have failed");
- } catch (Exception e) {
-
Assert.assertTrue(ExceptionUtils.findThrowable(e,
StateMigrationException.class).isPresent());
- }
+ valueState = backend.getPartitionedState(
+ new TestType("namespace", 123),
+ newNamespaceSerializerAfterRestore,
+ kvId);
- // test namespace serializer that requires migration;
start with a freshly restored backend
- backend.dispose();
- backend = restoreKeyedBackend(IntSerializer.INSTANCE,
snapshot);
- try {
- // the new namespace serializer requires
migration; this should fail the restore
- backend.getPartitionedState(
- new TestType("namespace", 123),
- new TestType.V2TestTypeSerializer(),
- kvId);
+ // access and check previous state
+ backend.setCurrentKey(1);
+ Assert.assertEquals(10, valueState.value().intValue());
+ valueState.update(10);
+ backend.setCurrentKey(5);
+ Assert.assertEquals(50, valueState.value().intValue());
- Assert.fail("should have failed");
- } catch (Exception e) {
-
Assert.assertTrue(ExceptionUtils.findThrowable(e,
StateMigrationException.class).isPresent());
- }
+ snapshot.discardState();
} finally {
backend.dispose();
}
}
//
-------------------------------------------------------------------------------
- // Operator state backend migration tests
+ // Operator state backend partitionable list state tests
//
-------------------------------------------------------------------------------
@Test
public void testOperatorParitionableListStateMigration() throws
Exception {
+ final String stateName = "partitionable-list-state";
+
+ testOperatorPartitionableListStateUpgrade(
+ new ListStateDescriptor<>(
+ stateName,
+ new TestType.V1TestTypeSerializer()),
+ new ListStateDescriptor<>(
+ stateName,
+ // restore with a V2 serializer that has a
different schema
+ new TestType.V2TestTypeSerializer()));
+ }
+
+ @Test
+ public void
testOperatorParitionableListStateSerializerReconfiguration() throws Exception {
+ final String stateName = "partitionable-list-state";
+
+ testOperatorPartitionableListStateUpgrade(
+ new ListStateDescriptor<>(
+ stateName,
+ new TestType.V1TestTypeSerializer()),
+ new ListStateDescriptor<>(
+ stateName,
+ // restore with a new serializer that requires
reconfiguration
+ new
TestType.ReconfigurationRequiringTestTypeSerializer()));
+ }
+
+ @Test
+ public void
testOperatorParitionableListStateRegistrationFailsIfNewSerializerIsIncompatible()
throws Exception {
+ final String stateName = "partitionable-list-state";
+
+ try {
+ testOperatorPartitionableListStateUpgrade(
+ new ListStateDescriptor<>(
+ stateName,
+ new TestType.V1TestTypeSerializer()),
+ new ListStateDescriptor<>(
+ stateName,
+ // restore with a new incompatible
serializer
+ new
TestType.IncompatibleTestTypeSerializer()));
+
+ Assert.fail("should have failed.");
+ } catch (Exception e) {
+ Assert.assertTrue(ExceptionUtils.findThrowable(e,
StateMigrationException.class).isPresent());
+ }
+ }
+
+ private void testOperatorPartitionableListStateUpgrade(
+ ListStateDescriptor<TestType> initialAccessDescriptor,
+ ListStateDescriptor<TestType>
newAccessDescriptorAfterRestore) throws Exception {
+
CheckpointStreamFactory streamFactory = createStreamFactory();
OperatorStateBackend backend = createOperatorStateBackend();
- final String stateName = "partitionable-list-state";
try {
- ListStateDescriptor<TestType> descriptor = new
ListStateDescriptor<>(
- stateName,
- new TestType.V1TestTypeSerializer());
- ListState<TestType> state =
backend.getListState(descriptor);
+ ListState<TestType> state =
backend.getListState(initialAccessDescriptor);
state.add(new TestType("foo", 13));
state.add(new TestType("bar", 278));
@@ -454,12 +569,8 @@ public void testOperatorParitionableListStateMigration()
throws Exception {
backend = restoreOperatorStateBackend(snapshot);
- descriptor = new ListStateDescriptor<>(
- stateName,
- new TestType.V2TestTypeSerializer());
- state = backend.getListState(descriptor);
+ state =
backend.getListState(newAccessDescriptorAfterRestore);
- // the state backend should have decided whether or not
it needs to perform state migration;
// make sure that reading and writing each state
partition works with the new serializer
Iterator<TestType> iterator = state.get().iterator();
Assert.assertEquals(new TestType("foo", 13),
iterator.next());
@@ -471,18 +582,69 @@ public void testOperatorParitionableListStateMigration()
throws Exception {
}
}
+ //
-------------------------------------------------------------------------------
+ // Operator state backend union list state tests
+ //
-------------------------------------------------------------------------------
+
+ @Test
+ public void testOperatorUnionListStateMigration() throws Exception {
+ final String stateName = "union-list-state";
+
+ testOperatorUnionListStateUpgrade(
+ new ListStateDescriptor<>(
+ stateName,
+ new TestType.V1TestTypeSerializer()),
+ new ListStateDescriptor<>(
+ stateName,
+ // restore with a V2 serializer that has a
different schema
+ new TestType.V2TestTypeSerializer()));
+ }
+
@Test
- public void testUnionListStateMigration() throws Exception {
+ public void testOperatorUnionListStateSerializerReconfiguration()
throws Exception {
+ final String stateName = "union-list-state";
+
+ testOperatorUnionListStateUpgrade(
+ new ListStateDescriptor<>(
+ stateName,
+ new TestType.V1TestTypeSerializer()),
+ new ListStateDescriptor<>(
+ stateName,
+ // restore with a new serializer that requires
reconfiguration
+ new
TestType.ReconfigurationRequiringTestTypeSerializer()));
+ }
+
+
+ @Test
+ public void
testOperatorUnionListStateRegistrationFailsIfNewSerializerIsIncompatible()
throws Exception {
+ final String stateName = "union-list-state";
+
+ try {
+ testOperatorUnionListStateUpgrade(
+ new ListStateDescriptor<>(
+ stateName,
+ new TestType.V1TestTypeSerializer()),
+ new ListStateDescriptor<>(
+ stateName,
+ // restore with a new incompatible
serializer
+ new
TestType.IncompatibleTestTypeSerializer()));
+
+ Assert.fail("should have failed.");
+ } catch (Exception e) {
+ Assert.assertTrue(ExceptionUtils.findThrowable(e,
StateMigrationException.class).isPresent());
+ }
+ }
+
+ private void testOperatorUnionListStateUpgrade(
+ ListStateDescriptor<TestType> initialAccessDescriptor,
+ ListStateDescriptor<TestType>
newAccessDescriptorAfterRestore) throws Exception {
+
CheckpointStreamFactory streamFactory = createStreamFactory();
OperatorStateBackend backend = createOperatorStateBackend();
- final String stateName = "union-list-state";
try {
- ListStateDescriptor<TestType> descriptor = new
ListStateDescriptor<>(
- stateName,
- new TestType.V1TestTypeSerializer());
- ListState<TestType> state =
backend.getUnionListState(descriptor);
+ ListState<TestType> state =
backend.getUnionListState(initialAccessDescriptor);
state.add(new TestType("foo", 13));
state.add(new TestType("bar", 278));
@@ -493,10 +655,7 @@ public void testUnionListStateMigration() throws Exception
{
backend = restoreOperatorStateBackend(snapshot);
- descriptor = new ListStateDescriptor<>(
- stateName,
- new TestType.V2TestTypeSerializer());
- state = backend.getUnionListState(descriptor);
+ state =
backend.getUnionListState(newAccessDescriptorAfterRestore);
// the state backend should have decided whether or not
it needs to perform state migration;
// make sure that reading and writing each state
partition works with the new serializer
@@ -510,171 +669,128 @@ public void testUnionListStateMigration() throws
Exception {
}
}
+ //
-------------------------------------------------------------------------------
+ // Operator state backend broadcast state tests
+ //
-------------------------------------------------------------------------------
+
@Test
public void testBroadcastStateValueMigration() throws Exception {
- CheckpointStreamFactory streamFactory = createStreamFactory();
-
- OperatorStateBackend backend = createOperatorStateBackend();
-
final String stateName = "broadcast-state";
- try {
- MapStateDescriptor<Integer, TestType> descriptor = new
MapStateDescriptor<>(
+
+ testBroadcastStateValueUpgrade(
+ new MapStateDescriptor<>(
stateName,
IntSerializer.INSTANCE,
- new TestType.V1TestTypeSerializer());
- BroadcastState<Integer, TestType> state =
backend.getBroadcastState(descriptor);
-
- state.put(3, new TestType("foo", 13));
- state.put(5, new TestType("bar", 278));
-
- OperatorStateHandle snapshot = runSnapshot(
- backend.snapshot(1L, 2L, streamFactory,
CheckpointOptions.forCheckpointWithDefaultLocation()));
- backend.dispose();
-
- backend = restoreOperatorStateBackend(snapshot);
-
- descriptor = new MapStateDescriptor<>(
+ new TestType.V1TestTypeSerializer()),
+ new MapStateDescriptor<>(
stateName,
IntSerializer.INSTANCE,
- new TestType.V2TestTypeSerializer());
- state = backend.getBroadcastState(descriptor);
-
- // the state backend should have decided whether or not
it needs to perform state migration;
- // make sure that reading and writing each broadcast
entry works with the new serializer
- Assert.assertEquals(new TestType("foo", 13),
state.get(3));
- Assert.assertEquals(new TestType("bar", 278),
state.get(5));
- state.put(17, new TestType("new-entry", 777));
- } finally {
- backend.dispose();
- }
+ // new value serializer is a V2 serializer with
a different schema
+ new TestType.V2TestTypeSerializer()));
}
+
@Test
public void testBroadcastStateKeyMigration() throws Exception {
- CheckpointStreamFactory streamFactory = createStreamFactory();
-
- OperatorStateBackend backend = createOperatorStateBackend();
-
final String stateName = "broadcast-state";
- try {
- MapStateDescriptor<TestType, Integer> descriptor = new
MapStateDescriptor<>(
+
+ testBroadcastStateKeyUpgrade(
+ new MapStateDescriptor<>(
stateName,
new TestType.V1TestTypeSerializer(),
- IntSerializer.INSTANCE);
- BroadcastState<TestType, Integer> state =
backend.getBroadcastState(descriptor);
-
- state.put(new TestType("foo", 13), 3);
- state.put(new TestType("bar", 278), 5);
-
- OperatorStateHandle snapshot = runSnapshot(
- backend.snapshot(1L, 2L, streamFactory,
CheckpointOptions.forCheckpointWithDefaultLocation()));
- backend.dispose();
-
- backend = restoreOperatorStateBackend(snapshot);
-
- descriptor = new MapStateDescriptor<>(
+ IntSerializer.INSTANCE),
+ new MapStateDescriptor<>(
stateName,
+ // new key serializer is a V2 serializer with a
different schema
new TestType.V2TestTypeSerializer(),
- IntSerializer.INSTANCE);
- state = backend.getBroadcastState(descriptor);
-
- // the state backend should have decided whether or not
it needs to perform state migration;
- // make sure that reading and writing each broadcast
entry works with the new serializer
- Assert.assertEquals((Integer) 3, state.get(new
TestType("foo", 13)));
- Assert.assertEquals((Integer) 5, state.get(new
TestType("bar", 278)));
- state.put(new TestType("new-entry", 777), 17);
- } finally {
- backend.dispose();
- }
+ IntSerializer.INSTANCE));
}
@Test
- public void
testOperatorParitionableListStateRegistrationFailsIfNewSerializerIsIncompatible()
throws Exception {
- CheckpointStreamFactory streamFactory = createStreamFactory();
-
- OperatorStateBackend backend = createOperatorStateBackend();
+ public void testBroadcastStateValueSerializerReconfiguration() throws
Exception {
+ final String stateName = "broadcast-state";
- final String stateName = "partitionable-list-state";
- try {
- ListStateDescriptor<TestType> descriptor = new
ListStateDescriptor<>(
+ testBroadcastStateValueUpgrade(
+ new MapStateDescriptor<>(
stateName,
- new TestType.V1TestTypeSerializer());
- ListState<TestType> state =
backend.getListState(descriptor);
-
- state.add(new TestType("foo", 13));
- state.add(new TestType("bar", 278));
-
- OperatorStateHandle snapshot = runSnapshot(
- backend.snapshot(1L, 2L, streamFactory,
CheckpointOptions.forCheckpointWithDefaultLocation()));
- backend.dispose();
+ IntSerializer.INSTANCE,
+ new TestType.V1TestTypeSerializer()),
+ new MapStateDescriptor<>(
+ stateName,
+ IntSerializer.INSTANCE,
+ // new value serializer is a new serializer
that requires reconfiguration
+ new
TestType.ReconfigurationRequiringTestTypeSerializer()));
+ }
- backend = restoreOperatorStateBackend(snapshot);
+ @Test
+ public void testBroadcastStateKeySerializerReconfiguration() throws
Exception {
+ final String stateName = "broadcast-state";
- descriptor = new ListStateDescriptor<>(
+ testBroadcastStateKeyUpgrade(
+ new MapStateDescriptor<>(
stateName,
- new TestType.IncompatibleTestTypeSerializer());
+ new TestType.V1TestTypeSerializer(),
+ IntSerializer.INSTANCE),
+ new MapStateDescriptor<>(
+ stateName,
+ // new key serializer is a new serializer that
requires reconfiguration
+ new
TestType.ReconfigurationRequiringTestTypeSerializer(),
+ IntSerializer.INSTANCE));
+ }
- // the new serializer is INCOMPATIBLE, so registering
the state should fail
- backend.getListState(descriptor);
+ @Test
+ public void
testBroadcastStateRegistrationFailsIfNewValueSerializerIsIncompatible() throws
Exception {
+ final String stateName = "broadcast-state";
+
+ try {
+ testBroadcastStateValueUpgrade(
+ new MapStateDescriptor<>(
+ stateName,
+ IntSerializer.INSTANCE,
+ new TestType.V1TestTypeSerializer()),
+ new MapStateDescriptor<>(
+ stateName,
+ IntSerializer.INSTANCE,
+ // new value serializer is incompatible
+ new
TestType.IncompatibleTestTypeSerializer()));
Assert.fail("should have failed.");
} catch (Exception e) {
Assert.assertTrue(ExceptionUtils.findThrowable(e,
StateMigrationException.class).isPresent());
- } finally {
- backend.dispose();
}
}
@Test
- public void
testUnionListStateRegistrationFailsIfNewSerializerIsIncompatible() throws
Exception {
- CheckpointStreamFactory streamFactory = createStreamFactory();
-
- OperatorStateBackend backend = createOperatorStateBackend();
+ public void
testBroadcastStateRegistrationFailsIfNewKeySerializerIsIncompatible() throws
Exception {
+ final String stateName = "broadcast-state";
- final String stateName = "union-list-state";
try {
- ListStateDescriptor<TestType> descriptor = new
ListStateDescriptor<>(
- stateName,
- new TestType.V1TestTypeSerializer());
- ListState<TestType> state =
backend.getUnionListState(descriptor);
-
- state.add(new TestType("foo", 13));
- state.add(new TestType("bar", 278));
-
- OperatorStateHandle snapshot = runSnapshot(
- backend.snapshot(1L, 2L, streamFactory,
CheckpointOptions.forCheckpointWithDefaultLocation()));
- backend.dispose();
-
- backend = restoreOperatorStateBackend(snapshot);
-
- descriptor = new ListStateDescriptor<>(
- stateName,
- new TestType.IncompatibleTestTypeSerializer());
-
- // the new serializer is INCOMPATIBLE, so registering
the state should fail
- backend.getUnionListState(descriptor);
+ testBroadcastStateKeyUpgrade(
+ new MapStateDescriptor<>(
+ stateName,
+ new TestType.V1TestTypeSerializer(),
+ IntSerializer.INSTANCE),
+ new MapStateDescriptor<>(
+ stateName,
+ // new key serializer is incompatible
+ new
TestType.IncompatibleTestTypeSerializer(),
+ IntSerializer.INSTANCE));
Assert.fail("should have failed.");
} catch (Exception e) {
Assert.assertTrue(ExceptionUtils.findThrowable(e,
StateMigrationException.class).isPresent());
- } finally {
- backend.dispose();
}
}
- @Test
- public void
testBroadcastStateRegistrationFailsIfNewValueSerializerIsIncompatible() throws
Exception {
+ private void testBroadcastStateValueUpgrade(
+ MapStateDescriptor<Integer, TestType>
initialAccessDescriptor,
+ MapStateDescriptor<Integer, TestType>
newAccessDescriptorAfterRestore) throws Exception {
CheckpointStreamFactory streamFactory = createStreamFactory();
OperatorStateBackend backend = createOperatorStateBackend();
- final String stateName = "broadcast-state";
try {
- MapStateDescriptor<Integer, TestType> descriptor = new
MapStateDescriptor<>(
- stateName,
- IntSerializer.INSTANCE,
- new TestType.V1TestTypeSerializer());
- BroadcastState<Integer, TestType> state =
backend.getBroadcastState(descriptor);
+ BroadcastState<Integer, TestType> state =
backend.getBroadcastState(initialAccessDescriptor);
state.put(3, new TestType("foo", 13));
state.put(5, new TestType("bar", 278));
@@ -685,35 +801,28 @@ public void
testBroadcastStateRegistrationFailsIfNewValueSerializerIsIncompatibl
backend = restoreOperatorStateBackend(snapshot);
- descriptor = new MapStateDescriptor<>(
- stateName,
- IntSerializer.INSTANCE,
- new TestType.IncompatibleTestTypeSerializer());
+ state =
backend.getBroadcastState(newAccessDescriptorAfterRestore);
- // the new value serializer is INCOMPATIBLE, so
registering the state should fail
- backend.getBroadcastState(descriptor);
-
- Assert.fail("should have failed.");
- } catch (Exception e) {
- Assert.assertTrue(ExceptionUtils.findThrowable(e,
StateMigrationException.class).isPresent());
+ // the state backend should have decided whether or not
it needs to perform state migration;
+ // make sure that reading and writing each broadcast
entry works with the new serializer
+ Assert.assertEquals(new TestType("foo", 13),
state.get(3));
+ Assert.assertEquals(new TestType("bar", 278),
state.get(5));
+ state.put(17, new TestType("new-entry", 777));
} finally {
backend.dispose();
}
}
- @Test
- public void
testBroadcastStateRegistrationFailsIfNewKeySerializerIsIncompatible() throws
Exception {
+ private void testBroadcastStateKeyUpgrade(
+ MapStateDescriptor<TestType, Integer>
initialAccessDescriptor,
+ MapStateDescriptor<TestType, Integer>
newAccessDescriptorAfterRestore) throws Exception {
+
CheckpointStreamFactory streamFactory = createStreamFactory();
OperatorStateBackend backend = createOperatorStateBackend();
- final String stateName = "broadcast-state";
try {
- MapStateDescriptor<TestType, Integer> descriptor = new
MapStateDescriptor<>(
- stateName,
- new TestType.V1TestTypeSerializer(),
- IntSerializer.INSTANCE);
- BroadcastState<TestType, Integer> state =
backend.getBroadcastState(descriptor);
+ BroadcastState<TestType, Integer> state =
backend.getBroadcastState(initialAccessDescriptor);
state.put(new TestType("foo", 13), 3);
state.put(new TestType("bar", 278), 5);
@@ -724,17 +833,13 @@ public void
testBroadcastStateRegistrationFailsIfNewKeySerializerIsIncompatible(
backend = restoreOperatorStateBackend(snapshot);
- descriptor = new MapStateDescriptor<>(
- stateName,
- new TestType.IncompatibleTestTypeSerializer(),
- IntSerializer.INSTANCE);
-
- // the new key serializer is INCOMPATIBLE, so
registering the state should fail
- backend.getBroadcastState(descriptor);
+ state =
backend.getBroadcastState(newAccessDescriptorAfterRestore);
- Assert.fail("should have failed.");
- } catch (Exception e) {
- Assert.assertTrue(ExceptionUtils.findThrowable(e,
StateMigrationException.class).isPresent());
+ // the state backend should have decided whether or not
it needs to perform state migration;
+ // make sure that reading and writing each broadcast
entry works with the new serializer
+ Assert.assertEquals((Integer) 3, state.get(new
TestType("foo", 13)));
+ Assert.assertEquals((Integer) 5, state.get(new
TestType("bar", 278)));
+ state.put(new TestType("new-entry", 777), 17);
} finally {
backend.dispose();
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSerializerProviderTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSerializerProviderTest.java
index de1f2bd962c..91ebc958ae6 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSerializerProviderTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSerializerProviderTest.java
@@ -42,15 +42,15 @@
//
--------------------------------------------------------------------------------
@Test
- public void testCurrentSchemaSerializerForNewStateSerializerProvider() {
- StateSerializerProvider<TestType> testProvider =
StateSerializerProvider.fromNewState(new TestType.V1TestTypeSerializer());
+ public void
testCurrentSchemaSerializerForEagerlyRegisteredStateSerializerProvider() {
+ StateSerializerProvider<TestType> testProvider =
StateSerializerProvider.fromNewRegisteredSerializer(new
TestType.V1TestTypeSerializer());
assertTrue(testProvider.currentSchemaSerializer() instanceof
TestType.V1TestTypeSerializer);
}
@Test
- public void
testCurrentSchemaSerializerForRestoredStateSerializerProvider() {
+ public void
testCurrentSchemaSerializerForLazilyRegisteredStateSerializerProvider() {
TestType.V1TestTypeSerializer serializer = new
TestType.V1TestTypeSerializer();
- StateSerializerProvider<TestType> testProvider =
StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration());
+ StateSerializerProvider<TestType> testProvider =
StateSerializerProvider.fromPreviousSerializerSnapshot(serializer.snapshotConfiguration());
assertTrue(testProvider.currentSchemaSerializer() instanceof
TestType.V1TestTypeSerializer);
}
@@ -59,17 +59,17 @@ public void
testCurrentSchemaSerializerForRestoredStateSerializerProvider() {
//
--------------------------------------------------------------------------------
@Test(expected = UnsupportedOperationException.class)
- public void testPreviousSchemaSerializerForNewStateSerializerProvider()
{
- StateSerializerProvider<TestType> testProvider =
StateSerializerProvider.fromNewState(new TestType.V1TestTypeSerializer());
+ public void
testPreviousSchemaSerializerForEagerlyRegisteredStateSerializerProvider() {
+ StateSerializerProvider<TestType> testProvider =
StateSerializerProvider.fromNewRegisteredSerializer(new
TestType.V1TestTypeSerializer());
// this should fail with an exception
testProvider.previousSchemaSerializer();
}
@Test
- public void
testPreviousSchemaSerializerForRestoredStateSerializerProvider() {
+ public void
testPreviousSchemaSerializerForLazilyRegisteredStateSerializerProvider() {
TestType.V1TestTypeSerializer serializer = new
TestType.V1TestTypeSerializer();
- StateSerializerProvider<TestType> testProvider =
StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration());
+ StateSerializerProvider<TestType> testProvider =
StateSerializerProvider.fromPreviousSerializerSnapshot(serializer.snapshotConfiguration());
assertTrue(testProvider.previousSchemaSerializer() instanceof
TestType.V1TestTypeSerializer);
}
@@ -78,7 +78,7 @@ public void testLazyInstantiationOfPreviousSchemaSerializer()
{
// create the provider with an exception throwing snapshot;
// this would throw an exception if the restore serializer was
eagerly accessed
StateSerializerProvider<String> testProvider =
- StateSerializerProvider.fromRestoredState(new
ExceptionThrowingSerializerSnapshot());
+
StateSerializerProvider.fromPreviousSerializerSnapshot(new
ExceptionThrowingSerializerSnapshot());
try {
// if we fail here, that means the restore serializer
was indeed lazily accessed
@@ -94,15 +94,15 @@ public void
testLazyInstantiationOfPreviousSchemaSerializer() {
//
--------------------------------------------------------------------------------
@Test(expected = UnsupportedOperationException.class)
- public void
testRegisterNewSerializerWithNewStateSerializerProviderShouldFail() {
- StateSerializerProvider<TestType> testProvider =
StateSerializerProvider.fromNewState(new TestType.V1TestTypeSerializer());
+ public void
testRegisterNewSerializerWithEagerlyRegisteredStateSerializerProviderShouldFail()
{
+ StateSerializerProvider<TestType> testProvider =
StateSerializerProvider.fromNewRegisteredSerializer(new
TestType.V1TestTypeSerializer());
testProvider.registerNewSerializerForRestoredState(new
TestType.V2TestTypeSerializer());
}
@Test(expected = UnsupportedOperationException.class)
- public void
testRegisterNewSerializerTwiceWithNewStateSerializerProviderShouldFail() {
+ public void
testRegisterNewSerializerTwiceWithLazilyRegisteredStateSerializerProviderShouldFail()
{
TestType.V1TestTypeSerializer serializer = new
TestType.V1TestTypeSerializer();
- StateSerializerProvider<TestType> testProvider =
StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration());
+ StateSerializerProvider<TestType> testProvider =
StateSerializerProvider.fromPreviousSerializerSnapshot(serializer.snapshotConfiguration());
testProvider.registerNewSerializerForRestoredState(new
TestType.V2TestTypeSerializer());
@@ -111,9 +111,9 @@ public void
testRegisterNewSerializerTwiceWithNewStateSerializerProviderShouldFa
}
@Test
- public void testRegisterNewCompatibleAsIsSerializer() {
+ public void testLazilyRegisterNewCompatibleAsIsSerializer() {
TestType.V1TestTypeSerializer serializer = new
TestType.V1TestTypeSerializer();
- StateSerializerProvider<TestType> testProvider =
StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration());
+ StateSerializerProvider<TestType> testProvider =
StateSerializerProvider.fromPreviousSerializerSnapshot(serializer.snapshotConfiguration());
// register compatible serializer for state
TypeSerializerSchemaCompatibility<TestType> schemaCompatibility
=
@@ -125,20 +125,36 @@ public void testRegisterNewCompatibleAsIsSerializer() {
}
@Test
- public void testRegisterNewCompatibleAfterMigrationSerializer() {
+ public void testLazilyRegisterNewCompatibleAfterMigrationSerializer() {
TestType.V1TestTypeSerializer serializer = new
TestType.V1TestTypeSerializer();
- StateSerializerProvider<TestType> testProvider =
StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration());
+ StateSerializerProvider<TestType> testProvider =
StateSerializerProvider.fromPreviousSerializerSnapshot(serializer.snapshotConfiguration());
// register serializer that requires migration for state
TypeSerializerSchemaCompatibility<TestType> schemaCompatibility
=
testProvider.registerNewSerializerForRestoredState(new
TestType.V2TestTypeSerializer());
assertTrue(schemaCompatibility.isCompatibleAfterMigration());
+
+ assertTrue(testProvider.currentSchemaSerializer() instanceof
TestType.V2TestTypeSerializer);
+ assertTrue(testProvider.previousSchemaSerializer() instanceof
TestType.V1TestTypeSerializer);
+ }
+
+ @Test
+ public void testLazilyRegisterNewSerializerRequiringReconfiguration() {
+ TestType.V1TestTypeSerializer serializer = new
TestType.V1TestTypeSerializer();
+ StateSerializerProvider<TestType> testProvider =
StateSerializerProvider.fromPreviousSerializerSnapshot(serializer.snapshotConfiguration());
+
+ // register serializer that requires reconfiguration, and
verify that
+ // the resulting current schema serializer is the reconfigured
one
+ TypeSerializerSchemaCompatibility<TestType> schemaCompatibility
=
+ testProvider.registerNewSerializerForRestoredState(new
TestType.ReconfigurationRequiringTestTypeSerializer());
+
assertTrue(schemaCompatibility.isCompatibleWithReconfiguredSerializer());
+ assertTrue(testProvider.currentSchemaSerializer().getClass() ==
TestType.V1TestTypeSerializer.class);
}
@Test
- public void testRegisterIncompatibleSerializer() {
+ public void testLazilyRegisterIncompatibleSerializer() {
TestType.V1TestTypeSerializer serializer = new
TestType.V1TestTypeSerializer();
- StateSerializerProvider<TestType> testProvider =
StateSerializerProvider.fromRestoredState(serializer.snapshotConfiguration());
+ StateSerializerProvider<TestType> testProvider =
StateSerializerProvider.fromPreviousSerializerSnapshot(serializer.snapshotConfiguration());
// register serializer that requires migration for state
TypeSerializerSchemaCompatibility<TestType> schemaCompatibility
=
@@ -148,6 +164,88 @@ public void testRegisterIncompatibleSerializer() {
try {
// a serializer for the current schema will no longer
be accessible
testProvider.currentSchemaSerializer();
+
+ fail();
+ } catch (Exception excepted) {
+ // success
+ }
+ }
+
+ //
--------------------------------------------------------------------------------
+ // Tests for
#setPreviousSerializerSnapshotForRestoredState(TypeSerializerSnapshot)
+ //
--------------------------------------------------------------------------------
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void
testSetSerializerSnapshotWithLazilyRegisteredSerializerProviderShouldFail() {
+ TestType.V1TestTypeSerializer serializer = new
TestType.V1TestTypeSerializer();
+ StateSerializerProvider<TestType> testProvider =
StateSerializerProvider.fromPreviousSerializerSnapshot(serializer.snapshotConfiguration());
+
+
testProvider.setPreviousSerializerSnapshotForRestoredState(serializer.snapshotConfiguration());
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void
testSetSerializerSnapshotTwiceWithEagerlyRegisteredSerializerProviderShouldFail()
{
+ TestType.V1TestTypeSerializer serializer = new
TestType.V1TestTypeSerializer();
+ StateSerializerProvider<TestType> testProvider =
StateSerializerProvider.fromNewRegisteredSerializer(serializer);
+
+
testProvider.setPreviousSerializerSnapshotForRestoredState(serializer.snapshotConfiguration());
+
+ // second registration should fail
+
testProvider.setPreviousSerializerSnapshotForRestoredState(serializer.snapshotConfiguration());
+ }
+
+ @Test
+ public void testEagerlyRegisterNewCompatibleAsIsSerializer() {
+ StateSerializerProvider<TestType> testProvider =
StateSerializerProvider.fromNewRegisteredSerializer(new
TestType.V1TestTypeSerializer());
+
+ // set previous serializer snapshot for state, which should let
the new serializer be considered compatible as is
+ TypeSerializerSchemaCompatibility<TestType> schemaCompatibility
=
+
testProvider.setPreviousSerializerSnapshotForRestoredState(new
TestType.V1TestTypeSerializer().snapshotConfiguration());
+ assertTrue(schemaCompatibility.isCompatibleAsIs());
+
+ assertTrue(testProvider.currentSchemaSerializer() instanceof
TestType.V1TestTypeSerializer);
+ assertTrue(testProvider.previousSchemaSerializer() instanceof
TestType.V1TestTypeSerializer);
+ }
+
+ @Test
+ public void testEagerlyRegisterCompatibleAfterMigrationSerializer() {
+ StateSerializerProvider<TestType> testProvider =
StateSerializerProvider.fromNewRegisteredSerializer(new
TestType.V2TestTypeSerializer());
+
+ // set previous serializer snapshot for state, which should let
the new serializer be considered compatible after migration
+ TypeSerializerSchemaCompatibility<TestType> schemaCompatibility
=
+
testProvider.setPreviousSerializerSnapshotForRestoredState(new
TestType.V1TestTypeSerializer().snapshotConfiguration());
+ assertTrue(schemaCompatibility.isCompatibleAfterMigration());
+
+ assertTrue(testProvider.currentSchemaSerializer() instanceof
TestType.V2TestTypeSerializer);
+ assertTrue(testProvider.previousSchemaSerializer() instanceof
TestType.V1TestTypeSerializer);
+ }
+
+ @Test
+ public void testEagerlyRegisterNewSerializerRequiringReconfiguration() {
+ StateSerializerProvider<TestType> testProvider =
StateSerializerProvider.fromNewRegisteredSerializer(new
TestType.ReconfigurationRequiringTestTypeSerializer());
+
+ // set previous serializer snapshot, which should let the new
serializer be considered to require reconfiguration,
+ // and verify that the resulting current schema serializer is
the reconfigured one
+ TypeSerializerSchemaCompatibility<TestType> schemaCompatibility
=
+
testProvider.setPreviousSerializerSnapshotForRestoredState(new
TestType.V1TestTypeSerializer().snapshotConfiguration());
+
assertTrue(schemaCompatibility.isCompatibleWithReconfiguredSerializer());
+ assertTrue(testProvider.currentSchemaSerializer().getClass() ==
TestType.V1TestTypeSerializer.class);
+ }
+
+ @Test
+ public void testEagerlyRegisterIncompatibleSerializer() {
+ StateSerializerProvider<TestType> testProvider =
StateSerializerProvider.fromNewRegisteredSerializer(new
TestType.IncompatibleTestTypeSerializer());
+
+ // set previous serializer snapshot for state, which should let
the new serializer be considered incompatible
+ TypeSerializerSchemaCompatibility<TestType> schemaCompatibility
=
+
testProvider.setPreviousSerializerSnapshotForRestoredState(new
TestType.V1TestTypeSerializer().snapshotConfiguration());
+ assertTrue(schemaCompatibility.isIncompatible());
+
+ try {
+ // a serializer for the current schema will no longer
be accessible
+ testProvider.currentSchemaSerializer();
+
+ fail();
} catch (Exception excepted) {
// success
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java
index e3b0a066e51..52d5521aae8 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java
@@ -148,6 +148,29 @@ public TestType deserialize(DataInputView source) throws
IOException {
}
}
+ /**
+ * A serializer that is meant to be compatible with any of the
serializers only ofter being reconfigured as a new instance.
+ */
+ public static class ReconfigurationRequiringTestTypeSerializer extends
TestTypeSerializerBase {
+
+ private static final long serialVersionUID =
-7254527815207212324L;
+
+ @Override
+ public void serialize(TestType record, DataOutputView target)
throws IOException {
+ throw new UnsupportedOperationException("The serializer
should have been reconfigured as a new instance; shouldn't be used.");
+ }
+
+ @Override
+ public TestType deserialize(DataInputView source) throws
IOException {
+ throw new UnsupportedOperationException("The serializer
should have been reconfigured as a new instance; shouldn't be used.");
+ }
+
+ @Override
+ public TypeSerializerSnapshot<TestType> snapshotConfiguration()
{
+ throw new UnsupportedOperationException("The serializer
should have been reconfigured as a new instance; shouldn't be used.");
+ }
+ }
+
/**
* A serializer that is meant to be incompatible with any of the
serializers.
*/
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V1TestTypeSerializerSnapshot.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V1TestTypeSerializerSnapshot.java
index b2b802a30c2..9f78ce32fdd 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V1TestTypeSerializerSnapshot.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V1TestTypeSerializerSnapshot.java
@@ -42,8 +42,13 @@ public int getCurrentVersion() {
return
TypeSerializerSchemaCompatibility.compatibleAsIs();
} else if (newSerializer instanceof
TestType.V2TestTypeSerializer) {
return
TypeSerializerSchemaCompatibility.compatibleAfterMigration();
- } else {
+ } else if (newSerializer instanceof
TestType.ReconfigurationRequiringTestTypeSerializer) {
+ // we mimic the reconfiguration by just
re-instantiating the correct serializer
+ return
TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(new
TestType.V1TestTypeSerializer());
+ } else if (newSerializer instanceof
TestType.IncompatibleTestTypeSerializer) {
return TypeSerializerSchemaCompatibility.incompatible();
+ } else {
+ throw new IllegalStateException("Unknown serializer
class for TestType.");
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V2TestTypeSerializerSnapshot.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V2TestTypeSerializerSnapshot.java
index 3cd4fff8d86..983a2ae88c5 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V2TestTypeSerializerSnapshot.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V2TestTypeSerializerSnapshot.java
@@ -40,8 +40,17 @@ public int getCurrentVersion() {
public TypeSerializerSchemaCompatibility<TestType>
resolveSchemaCompatibility(TypeSerializer<TestType> newSerializer) {
if (newSerializer instanceof TestType.V2TestTypeSerializer) {
return
TypeSerializerSchemaCompatibility.compatibleAsIs();
- } else {
+ } else if (newSerializer instanceof
TestType.ReconfigurationRequiringTestTypeSerializer) {
+ // we mimic the reconfiguration by just
re-instantiating the correct serializer
+ return
TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(new
TestType.V2TestTypeSerializer());
+ } else if (
+ // migrating from V2 -> V1 is not supported
+ newSerializer instanceof TestType.V1TestTypeSerializer
+ || newSerializer instanceof
TestType.IncompatibleTestTypeSerializer) {
+
return TypeSerializerSchemaCompatibility.incompatible();
+ } else {
+ throw new IllegalStateException("Unknown serializer
class for TestType.");
}
}
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 700c5468c8c..5f914ff4645 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -336,7 +336,7 @@ private static void checkAndCreateDirectory(File directory)
throws IOException {
final TypeSerializer<N> namespaceSerializer =
registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer();
final DataOutputSerializer namespaceOutputView = new
DataOutputSerializer(8);
- boolean ambiguousKeyPossible =
RocksDBKeySerializationUtils.isAmbiguousKeyPossible(keySerializer,
namespaceSerializer);
+ boolean ambiguousKeyPossible =
RocksDBKeySerializationUtils.isAmbiguousKeyPossible(getKeySerializer(),
namespaceSerializer);
final byte[] nameSpaceBytes;
try {
RocksDBKeySerializationUtils.writeNameSpace(
@@ -352,7 +352,7 @@ private static void checkAndCreateDirectory(File directory)
throws IOException {
RocksIteratorWrapper iterator = getRocksIterator(db,
columnInfo.f0);
iterator.seekToFirst();
- final RocksStateKeysIterator<K> iteratorWrapper = new
RocksStateKeysIterator<>(iterator, state, keySerializer, keyGroupPrefixBytes,
+ final RocksStateKeysIterator<K> iteratorWrapper = new
RocksStateKeysIterator<>(iterator, state, getKeySerializer(),
keyGroupPrefixBytes,
ambiguousKeyPossible, nameSpaceBytes);
Stream<K> targetStream =
StreamSupport.stream(Spliterators.spliteratorUnknownSize(iteratorWrapper,
Spliterator.ORDERED), false);
@@ -536,7 +536,7 @@ void initializeSnapshotStrategy(
new RocksFullSnapshotStrategy<>(
db,
rocksDBResourceGuard,
- keySerializer,
+ getKeySerializer(),
kvStateInformation,
keyGroupRange,
keyGroupPrefixBytes,
@@ -563,7 +563,7 @@ void initializeSnapshotStrategy(
this.checkpointSnapshotStrategy = new
RocksIncrementalSnapshotStrategy<>(
db,
rocksDBResourceGuard,
- keySerializer,
+ getKeySerializer(),
kvStateInformation,
keyGroupRange,
keyGroupPrefixBytes,
@@ -655,6 +655,8 @@ private RocksDB openDB(
/** The compression decorator that was used for writing the
state, as determined by the meta data. */
private StreamCompressionDecorator
keygroupStreamCompressionDecorator;
+ private boolean isKeySerializerCompatibilityChecked;
+
/**
* Creates a restore operation object for the given state
backend instance.
*
@@ -720,11 +722,16 @@ private void restoreKVStateMetaData() throws IOException,
StateMigrationExceptio
serializationProxy.read(currentStateHandleInView);
- // check for key serializer compatibility; this also
reconfigures the
- // key serializer to be compatible, if it is required
and is possible
- if (!serializationProxy.getKeySerializerConfigSnapshot()
-
.resolveSchemaCompatibility(rocksDBKeyedStateBackend.keySerializer).isCompatibleAsIs())
{
- throw new StateMigrationException("The new key
serializer must be compatible.");
+ if (!isKeySerializerCompatibilityChecked) {
+ // check for key serializer compatibility; this
also reconfigures the
+ // key serializer to be compatible, if it is
required and is possible
+ TypeSerializerSchemaCompatibility<K>
keySerializerSchemaCompat =
+
rocksDBKeyedStateBackend.checkKeySerializerSchemaCompatibility(serializationProxy.getKeySerializerSnapshot());
+ if
(keySerializerSchemaCompat.isCompatibleAfterMigration() ||
keySerializerSchemaCompat.isIncompatible()) {
+ throw new StateMigrationException("The
new key serializer must be compatible.");
+ }
+
+ isKeySerializerCompatibilityChecked = true;
}
this.keygroupStreamCompressionDecorator =
serializationProxy.isUsingKeyGroupCompression() ?
@@ -821,6 +828,7 @@ private void restoreKVStateData() throws IOException,
RocksDBException {
private final SortedMap<Long, Set<StateHandleID>>
restoredSstFiles;
private UUID restoredBackendUID;
private long lastCompletedCheckpointId;
+ private boolean isKeySerializerCompatibilityChecked;
private
RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBackend) {
@@ -1265,11 +1273,16 @@ private void restoreInstanceDirectoryFromPath(Path
source) throws IOException {
DataInputView in = new
DataInputViewStreamWrapper(inputStream);
serializationProxy.read(in);
- // check for key serializer compatibility; this
also reconfigures the
- // key serializer to be compatible, if it is
required and is possible
- if
(!serializationProxy.getKeySerializerConfigSnapshot()
-
.resolveSchemaCompatibility(stateBackend.keySerializer).isCompatibleAsIs()) {
- throw new StateMigrationException("The
new key serializer must be compatible.");
+ if (!isKeySerializerCompatibilityChecked) {
+ // check for key serializer
compatibility; this also reconfigures the
+ // key serializer to be compatible, if
it is required and is possible
+ TypeSerializerSchemaCompatibility<T>
keySerializerSchemaCompat =
+
stateBackend.checkKeySerializerSchemaCompatibility(serializationProxy.getKeySerializerSnapshot());
+ if
(keySerializerSchemaCompat.isCompatibleAfterMigration() ||
keySerializerSchemaCompat.isIncompatible()) {
+ throw new
StateMigrationException("The new key serializer must be compatible.");
+ }
+
+ isKeySerializerCompatibilityChecked =
true;
}
return
serializationProxy.getStateMetaInfoSnapshots();
@@ -1346,7 +1359,7 @@ private void restoreInstanceDirectoryFromPath(Path
source) throws IOException {
restoredKvStateMetaInfo.updateSnapshotTransformer(snapshotTransformer);
TypeSerializerSchemaCompatibility<N> s =
restoredKvStateMetaInfo.updateNamespaceSerializer(namespaceSerializer);
- if (!s.isCompatibleAsIs()) {
+ if (s.isCompatibleAfterMigration() || s.isIncompatible()) {
throw new StateMigrationException("The new namespace
serializer must be compatible.");
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services