tzulitai closed pull request #6930: [FLINK-10679] Remove deprecated
CompatibilityResult and related classes from framework code
URL: https://github.com/apache/flink/pull/6930
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleSerializerSnapshot.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleSerializerSnapshot.java
index 49d03db9830..4b72902264f 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleSerializerSnapshot.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleSerializerSnapshot.java
@@ -24,6 +24,7 @@
import org.apache.flink.util.Preconditions;
import javax.annotation.Nonnull;
+
import java.io.IOException;
/**
@@ -48,13 +49,13 @@ public
BackwardsCompatibleSerializerSnapshot(TypeSerializer<T> serializerInstanc
}
@Override
- public void write(DataOutputView out) throws IOException {
+ public void writeSnapshot(DataOutputView out) throws IOException {
throw new UnsupportedOperationException(
"This is a dummy config snapshot used only for
backwards compatibility.");
}
@Override
- public void read(int version, DataInputView in, ClassLoader
userCodeClassLoader) throws IOException {
+ public void readSnapshot(int version, DataInputView in, ClassLoader
userCodeClassLoader) throws IOException {
throw new UnsupportedOperationException(
"This is a dummy config snapshot used only for
backwards compatibility.");
}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
index a188a4d23db..ae19d038c39 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
@@ -68,7 +68,9 @@
if (precedingSerializerConfigSnapshot != null
&& !(precedingSerializerConfigSnapshot instanceof
BackwardsCompatibleSerializerSnapshot)) {
- CompatibilityResult<T> initialResult =
newSerializer.ensureCompatibility(precedingSerializerConfigSnapshot);
+ CompatibilityResult<T> initialResult =
resolveCompatibilityResult(
+ (TypeSerializerSnapshot<T>)
precedingSerializerConfigSnapshot,
+ newSerializer);
if (!initialResult.isRequiresMigration()) {
return initialResult;
@@ -89,4 +91,19 @@
}
}
+ public static <T> CompatibilityResult<T> resolveCompatibilityResult(
+ TypeSerializerSnapshot<T>
precedingSerializerConfigSnapshot,
+ TypeSerializer<T> newSerializer) {
+
+ TypeSerializerSchemaCompatibility<T, TypeSerializer<T>>
compatibility =
+
precedingSerializerConfigSnapshot.resolveSchemaCompatibility(newSerializer);
+
+ // everything except "compatible" maps to "requires migration".
+ // at the entry point of the new-to-old-bridge (in the
TypeSerializerConfigSnapshot), we
+ // interpret "requiresMigration" as 'incompatible'. That is a
precaution because
+ // serializers could previously not specify the 'incompatible'
case.
+ return compatibility.isCompatibleAsIs() ?
+ CompatibilityResult.compatible() :
+ CompatibilityResult.requiresMigration();
+ }
}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index 7a1675eafba..ddb0b87e525 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -18,7 +18,6 @@
package org.apache.flink.api.common.typeutils;
-import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -29,11 +28,37 @@
/**
* This interface describes the methods that are required for a data type to
be handled by the Flink
* runtime. Specifically, this interface contains the serialization and
copying methods.
- * <p>
- * The methods in this class are assumed to be stateless, such that it is
effectively thread safe. Stateful
+ *
+ * <p>The methods in this class are assumed to be stateless, such that it is
effectively thread safe. Stateful
* implementations of the methods may lead to unpredictable side effects and
will compromise both stability and
* correctness of the program.
- *
+ *
+ * <p><b>Upgrading TypeSerializers to the new TypeSerializerSnapshot model</b>
+ *
+ * <p>This section is relevant if you implemented a TypeSerializer in Flink
versions up to 1.6 and want
+ * to adapt that implementation to the new interfaces that support proper
state schema evolution. Please
+ * follow these steps:
+ *
+ * <ul>
+ * <li>Change the type serializer's config snapshot to implement {@link
TypeSerializerSnapshot}, rather
+ * than extending {@code TypeSerializerConfigSnapshot} (as previously).
+ * <li>Move the compatibility check from the {@link
TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)}
+ * method to the {@link
TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer)} method.
+ * </ul>
+ *
+ * <p><b>Maintaining Backwards Compatibility</b>
+ *
+ * <p>If you want your serializer to be able to restore checkpoints from Flink
1.6 and before, add the steps
+ * below in addition to the steps above.
+ *
+ * <ul>
+ * <li>Retain the old serializer snapshot class (extending {@code
TypeSerializerConfigSnapshot}) under
+ * the same name and give the updated serializer snapshot class (the one
extending {@code TypeSerializerSnapshot})
+ * a new name.
+ * <li>Keep the {@link
TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)} on the
TypeSerializer
+ * as well.
+ * </ul>
+ *
* @param <T> The data type that the serializer serializes.
*/
@PublicEvolving
@@ -163,85 +188,55 @@
public abstract int hashCode();
//
--------------------------------------------------------------------------------------------
- // Serializer configuration snapshotting & compatibility
+ // Serializer configuration snapshot for checkpoints/savepoints
//
--------------------------------------------------------------------------------------------
/**
- * Create a snapshot of the serializer's current configuration to be
stored along with the managed state it is
- * registered to (if any - this method is only relevant if this
serializer is registered for serialization of
- * managed state).
+ * Snapshots the configuration of this TypeSerializer. This method is
only relevant if the serializer is
+ * used to state stored in checkpoints/savepoints.
*
- * <p>The configuration snapshot should contain information about the
serializer's parameter settings and its
- * serialization format. When a new serializer is registered to
serialize the same managed state that this
- * serializer was registered to, the returned configuration snapshot
can be used to ensure compatibility
- * of the new serializer and determine if state migration is required.
+ * <p>The snapshot of the TypeSerializer is supposed to contain all
information that affects the serialization
+ * format of the serializer. The snapshot serves two purposes: First,
to reproduce the serializer when the
+ * checkpoint/savepoint is restored, and second, to check whether the
serialization format is compatible
+ * with the serializer used in the restored program.
*
- * @see TypeSerializerSnapshot
+ * <p><b>IMPORTANT:</b> TypeSerializerSnapshots changed after Flink
1.6. Serializers implemented against
+ * Flink versions up to 1.6 should still work, but adjust to new model
to enable state evolution and be
+ * future-proof.
+ * See the class-level comments, section "Upgrading TypeSerializers to
the new TypeSerializerSnapshot model"
+ * for details.
+ *
+ * @see
TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer)
*
* @return snapshot of the serializer's current configuration (cannot
be {@code null}).
*/
public abstract TypeSerializerSnapshot<T> snapshotConfiguration();
+ //
--------------------------------------------------------------------------------------------
+ // Deprecated methods for backwards compatibility
+ //
--------------------------------------------------------------------------------------------
+
/**
- * Ensure compatibility of this serializer with a preceding serializer
that was registered for serialization of
- * the same managed state (if any - this method is only relevant if
this serializer is registered for
- * serialization of managed state).
- *
- * <p>The compatibility check in this method should be performed by
inspecting the preceding serializer's configuration
- * snapshot. The method may reconfigure the serializer (if required and
possible) so that it may be compatible,
- * or provide a signaling result that informs Flink that state
migration is necessary before continuing to use
- * this serializer.
- *
- * <p>The result can be one of the following:
- * <ul>
- * <li>{@link CompatibilityResult#compatible()}: this signals Flink
that this serializer is compatible, or
- * has been reconfigured to be compatible, to continue reading
previous data, and that the
- * serialization schema remains the same. No migration needs to be
performed.</li>
- *
- * <li>{@link
CompatibilityResult#requiresMigration(TypeDeserializer)}: this signals Flink
that
- * migration needs to be performed, because this serializer is not
compatible, or cannot be reconfigured to be
- * compatible, for previous data. Furthermore, in the case that the
preceding serializer cannot be found or
- * restored to read the previous data to perform the migration, the
provided convert deserializer can be
- * used as a fallback resort.</li>
+ * This method is deprecated. It used to resolved compatibility of the
serializer with serializer
+ * config snapshots in checkpoints. The responsibility for this has
moved to
+ * {@link
TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer)}.
*
- * <li>{@link CompatibilityResult#requiresMigration()}: this
signals Flink that migration needs to be
- * performed, because this serializer is not compatible, or cannot
be reconfigured to be compatible, for
- * previous data. If the preceding serializer cannot be found
(either its implementation changed or it was
- * removed from the classpath) then the migration will fail due to
incapability to read previous data.</li>
- * </ul>
+ * <p>New serializers should not override this method any more!
Serializers implemented against Flink
+ * versions up to 1.6 should still work, but should adjust to new model
to enable state evolution and
+ * be future-proof. See the class-level comments, section <i>"Upgrading
TypeSerializers to the new
+ * TypeSerializerSnapshot model"</i> for details.
*
- * @see CompatibilityResult
- *
- * @param configSnapshot configuration snapshot of a preceding
serializer for the same managed state
- *
- * @return the determined compatibility result (cannot be {@code null}).
+ * @deprecated Replaced by {@link
TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer)}.
*/
@Deprecated
public CompatibilityResult<T>
ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
- throw new IllegalStateException(
- "Seems like that you are still using
TypeSerializerConfigSnapshot; if so, this method must be implemented. " +
- "Once you change to directly use
TypeSerializerSnapshot, then you can safely remove the implementation " +
- "of this method.");
- }
-
- @Internal
- public final CompatibilityResult<T>
ensureCompatibility(TypeSerializerSnapshot<?> configSnapshot) {
- if (configSnapshot instanceof TypeSerializerConfigSnapshot) {
- return
ensureCompatibility((TypeSerializerConfigSnapshot<?>) configSnapshot);
- } else {
- @SuppressWarnings("unchecked")
- TypeSerializerSnapshot<T> casted =
(TypeSerializerSnapshot<T>) configSnapshot;
-
- TypeSerializerSchemaCompatibility<T, ? extends
TypeSerializer<T>> compat = casted.resolveSchemaCompatibility(this);
- if (compat.isCompatibleAsIs()) {
- return CompatibilityResult.compatible();
- } else if (compat.isCompatibleAfterMigration()) {
- return CompatibilityResult.requiresMigration();
- } else if (compat.isIncompatible()) {
- throw new IllegalStateException("The new
serializer is incompatible.");
- } else {
- throw new IllegalStateException("Unidentifiable
schema compatibility type. This is a bug, please file a JIRA.");
- }
- }
+ throw new UnsupportedOperationException(
+ "This method is not supported any more - please
evolve your TypeSerializer the following way:\n\n" +
+ " - If you have a serializer whose
'ensureCompatibility()' method delegates to another\n" +
+ " serializer's 'ensureCompatibility()',
please use" +
+
"'CompatibilityUtil.resolveCompatibilityResult(snapshot, this)' instead.\n\n" +
+ " - If you updated your serializer (removed
overriding the 'ensureCompatibility()' method),\n" +
+ " please also update the corresponding
config snapshot to not extend 'TypeSerializerConfigSnapshot'" +
+ "any more.\n\n");
}
}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
index 236b994b295..9ae275a056a 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
@@ -22,92 +22,39 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.core.io.VersionedIOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
+import static org.apache.flink.util.Preconditions.checkState;
+
/**
- * A {@code TypeSerializerConfigSnapshot} is a point-in-time view of a {@link
TypeSerializer's} configuration.
- * The configuration snapshot of a serializer is persisted within checkpoints
- * as a single source of meta information about the schema of serialized data
in the checkpoint.
- * This serves three purposes:
- *
- * <ul>
- * <li><strong>Capturing serializer parameters and schema:</strong> a
serializer's configuration snapshot
- * represents information about the parameters, state, and schema of a
serializer.
- * This is explained in more detail below.</li>
- *
- * <li><strong>Compatibility checks for new serializers:</strong> when new
serializers are available,
- * they need to be checked whether or not they are compatible to read the
data written by the previous serializer.
- * This is performed by providing the new serializer to the corresponding
serializer configuration
- * snapshots in checkpoints.</li>
- *
- * <li><strong>Factory for a read serializer when schema conversion is
required:<strong> in the case that new
- * serializers are not compatible to read previous data, a schema conversion
process executed across all data
- * is required before the new serializer can be continued to be used. This
conversion process requires a compatible
- * read serializer to restore serialized bytes as objects, and then written
back again using the new serializer.
- * In this scenario, the serializer configuration snapshots in checkpoints
doubles as a factory for the read
- * serializer of the conversion process.</li>
- * </ul>
- *
- * <h2>Serializer Configuration and Schema</h2>
- *
- * <p>Since serializer configuration snapshots needs to be used to ensure
serialization compatibility
- * for the same managed state as well as serving as a factory for compatible
read serializers, the configuration
- * snapshot should encode sufficient information about:
- *
- * <ul>
- * <li><strong>Parameter settings of the serializer:</strong> parameters of
the serializer include settings
- * required to setup the serializer, or the state of the serializer if it is
stateful. If the serializer
- * has nested serializers, then the configuration snapshot should also
contain the parameters of the nested
- * serializers.</li>
+ * This class bridges between the old serializer config snapshot interface
(this class) and the new
+ * serializer config snapshot interface ({@link TypeSerializerSnapshot}).
*
- * <li><strong>Serialization schema of the serializer:</strong> the binary
format used by the serializer, or
- * in other words, the schema of data written by the serializer.</li>
- * </ul>
- *
- * <p>NOTE: Implementations must contain the default empty nullary
constructor. This is required to be able to
- * deserialize the configuration snapshot from its binary form.
- *
- * @param <T> The data type that the originating serializer of this
configuration serializes.
- *
- * @deprecated This class has been deprecated since Flink 1.7, and will
eventually be removed.
- * Please refer to, and directly implement a {@link
TypeSerializerSnapshot} instead.
- * Class-level Javadocs of {@link TypeSerializerSnapshot} provides
more details
- * on migrating to the new interface.
+ * <p>Serializers that create snapshots and compatibility checks with the old
interfaces extends this class
+ * and should migrate to extend {@code TypeSerializerSnapshot} to properly
support state evolution/migration
+ * and be future-proof.
*/
@PublicEvolving
@Deprecated
public abstract class TypeSerializerConfigSnapshot<T> extends
VersionedIOReadableWritable implements TypeSerializerSnapshot<T> {
+ /** Version / Magic number for the format that bridges between the old
and new interface. */
+ private static final int ADAPTER_VERSION = 0x7a53c4f0;
+
/** The user code class loader; only relevant if this configuration
instance was deserialized from binary form. */
private ClassLoader userCodeClassLoader;
- /**
- * The originating serializer of this configuration snapshot.
- */
+ /** The originating serializer of this configuration snapshot. */
private TypeSerializer<T> serializer;
- /**
- * Creates a serializer using this configuration, that is capable of
reading data
- * written by the serializer described by this configuration.
- *
- * @return the restored serializer.
- */
- public TypeSerializer<T> restoreSerializer() {
- if (serializer != null) {
- return this.serializer;
- } else {
- throw new IllegalStateException("Trying to restore the
prior serializer via TypeSerializerConfigSnapshot, " +
- "but the prior serializer has not been set.");
- }
- }
-
/**
* Set the originating serializer of this configuration snapshot.
*/
@Internal
- public void setPriorSerializer(TypeSerializer<T> serializer) {
+ public final void setPriorSerializer(TypeSerializer<T> serializer) {
this.serializer = Preconditions.checkNotNull(serializer);
}
@@ -135,26 +82,77 @@ public final ClassLoader getUserCodeClassLoader() {
return userCodeClassLoader;
}
- public abstract boolean equals(Object obj);
-
- public abstract int hashCode();
-
//
----------------------------------------------------------------------------
- // Irrelevant methods; these methods should only ever be used when the
new interface is directly implemented.
+ // Implementation of the TypeSerializerSnapshot interface
//
----------------------------------------------------------------------------
@Override
- public int getCurrentVersion() {
- throw new UnsupportedOperationException();
+ public final int getCurrentVersion() {
+ return ADAPTER_VERSION;
+ }
+
+ @Override
+ public final void writeSnapshot(DataOutputView out) throws IOException {
+ checkState(serializer != null, "the prior serializer has not
been set on this");
+
+ // write the snapshot for a non-updated serializer.
+ // this mimics the previous behavior where the TypeSerializer
was
+ // Java-serialized, for backwards compatibility
+ TypeSerializerSerializationUtil.writeSerializer(out,
serializer);
+
+ // now delegate to the snapshots own writing code
+ write(out);
+ }
+
+ @Override
+ public final void readSnapshot(int readVersion, DataInputView in,
ClassLoader userCodeClassLoader) throws IOException {
+ if (readVersion != ADAPTER_VERSION) {
+ throw new IOException("Wrong/unexpected version for the
TypeSerializerConfigSnapshot: " + readVersion);
+ }
+
+ serializer =
TypeSerializerSerializationUtil.tryReadSerializer(in, userCodeClassLoader,
true);
+
+ // now delegate to the snapshots own reading code
+ setUserCodeClassLoader(userCodeClassLoader);
+ read(in);
}
+ /**
+ * Creates a serializer using this configuration, that is capable of
reading data
+ * written by the serializer described by this configuration.
+ *
+ * @return the restored serializer.
+ */
@Override
- public final void read(int readVersion, DataInputView in, ClassLoader
userCodeClassLoader) throws IOException {
- throw new UnsupportedOperationException();
+ public final TypeSerializer<T> restoreSerializer() {
+ if (serializer == null) {
+ throw new IllegalStateException(
+ "Trying to restore the prior serializer
via TypeSerializerConfigSnapshot, " +
+ "but the prior serializer has not been
set.");
+ }
+ else if (serializer instanceof UnloadableDummyTypeSerializer) {
+ Throwable originalError =
((UnloadableDummyTypeSerializer<?>) serializer).getOriginalError();
+
+ throw new IllegalStateException(
+ "Could not Java-deserialize
TypeSerializer while restoring checkpoint metadata for serializer " +
+ "snapshot '" + getClass().getName() +
"'. " +
+ "Please update to the
TypeSerializerSnapshot interface that removes Java Serialization to avoid " +
+ "this problem in the future.",
originalError);
+ } else {
+ return this.serializer;
+ }
}
@Override
- public final <NS extends TypeSerializer<T>>
TypeSerializerSchemaCompatibility<T, NS> resolveSchemaCompatibility(NS
newSerializer) {
- throw new UnsupportedOperationException();
+ public final <NS extends TypeSerializer<T>>
TypeSerializerSchemaCompatibility<T, NS> resolveSchemaCompatibility(
+ NS newSerializer) {
+
+ // in prior versions, the compatibility check was in the
serializer itself, so we
+ // delegate this call to the serializer.
+ final CompatibilityResult<T> compatibility =
newSerializer.ensureCompatibility(this);
+
+ return compatibility.isRequiresMigration() ?
+
TypeSerializerSchemaCompatibility.incompatible() :
+
TypeSerializerSchemaCompatibility.compatibleAsIs();
}
}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
index e7cdc382f60..7a4ee5bd606 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
@@ -118,7 +118,7 @@
} catch (UnloadableTypeSerializerException e) {
if (useDummyPlaceholder) {
LOG.warn("Could not read a requested
serializer. Replaced with a UnloadableDummyTypeSerializer.", e.getCause());
- return new
UnloadableDummyTypeSerializer<>(e.getSerializerBytes());
+ return new
UnloadableDummyTypeSerializer<>(e.getSerializerBytes(), e.getCause());
} else {
throw e;
}
@@ -232,8 +232,6 @@ public static void writeSerializersAndConfigsWithResilience(
*/
public static final class TypeSerializerSerializationProxy<T> extends
VersionedIOReadableWritable {
- private static final Logger LOG =
LoggerFactory.getLogger(TypeSerializerSerializationProxy.class);
-
private static final int VERSION = 1;
private ClassLoader userClassLoader;
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java
index 067d43d480d..4715d1119bf 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java
@@ -25,7 +25,7 @@
import java.io.IOException;
/**
- * A {@code TypeSerializerSnapshot} is a point-in-time view of a {@link
TypeSerializer's} configuration.
+ * A {@code TypeSerializerSnapshot} is a point-in-time view of a {@link
TypeSerializer}'s configuration.
* The configuration snapshot of a serializer is persisted within checkpoints
* as a single source of meta information about the schema of serialized data
in the checkpoint.
* This serves three purposes:
@@ -40,7 +40,7 @@
* This is performed by providing the new serializer to the correspondibng
serializer configuration
* snapshots in checkpoints.</li>
*
- * <li><strong>Factory for a read serializer when schema conversion is
required:<strong> in the case that new
+ * <li><strong>Factory for a read serializer when schema conversion is
required:</strong> in the case that new
* serializers are not compatible to read previous data, a schema conversion
process executed across all data
* is required before the new serializer can be continued to be used. This
conversion process requires a compatible
* read serializer to restore serialized bytes as objects, and then written
back again using the new serializer.
@@ -86,9 +86,9 @@
*
* @param out the {@link DataOutputView} to write the snapshot to.
*
- * @throws IOException
+ * @throws IOException Thrown if the snapshot data could not be written.
*/
- void write(DataOutputView out) throws IOException;
+ void writeSnapshot(DataOutputView out) throws IOException;
/**
* Reads the serializer snapshot from the provided {@link
DataInputView}.
@@ -100,9 +100,9 @@
* @param in the {@link DataInputView} to read the snapshot from.
* @param userCodeClassLoader the user code classloader
*
- * @throws IOException
+ * * @throws IOException Thrown if the snapshot data could be read or
parsed.
*/
- void read(int readVersion, DataInputView in, ClassLoader
userCodeClassLoader) throws IOException;
+ void readSnapshot(int readVersion, DataInputView in, ClassLoader
userCodeClassLoader) throws IOException;
/**
* Recreates a serializer instance from this snapshot. The returned
@@ -114,8 +114,16 @@
TypeSerializer<T> restoreSerializer();
/**
- * Checks a new serializer's compatibility to read data written by the
prior
- * serializer.
+ * Checks a new serializer's compatibility to read data written by the
prior serializer.
+ *
+ * <p>When a checkpoint/savepoint is restored, this method checks
whether the serialization
+ * format of the data in the checkpoint/savepoint is compatible for the
format of the serializer used by the
+ * program that restores the checkpoint/savepoint. The outcome can be
that the serialization format is
+ * compatible, that the program's serializer needs to reconfigure
itself (meaning to incorporate some
+ * information from the TypeSerializerSnapshot to be compatible), that
the format is outright incompatible,
+ * or that a migration needed. In the latter case, the
TypeSerializerSnapshot produces a serializer to
+ * deserialize the data, and the restoring program's serializer
re-serializes the data, thus converting
+ * the format during the restore operation.
*
* @param newSerializer the new serializer to check.
* @param <NS> the type of the new serializer
@@ -124,4 +132,35 @@
*/
<NS extends TypeSerializer<T>> TypeSerializerSchemaCompatibility<T, NS>
resolveSchemaCompatibility(NS newSerializer);
+ //
------------------------------------------------------------------------
+ // read / write utilities
+ //
------------------------------------------------------------------------
+
+ /**
+ * Writes the given snapshot to the out stream. One should always use
this method to write
+ * snapshots out, rather than directly calling {@link
#writeSnapshot(DataOutputView)}.
+ *
+ * <p>The snapshot written with this method can be read via {@link
#readVersionedSnapshot(DataInputView, ClassLoader)}.
+ */
+ static void writeVersionedSnapshot(DataOutputView out,
TypeSerializerSnapshot<?> snapshot) throws IOException {
+ out.writeUTF(snapshot.getClass().getName());
+ out.writeInt(snapshot.getCurrentVersion());
+ snapshot.writeSnapshot(out);
+ }
+
+
+ /**
+ * Reads a snapshot from the stream, performing resolving
+ *
+ * <p>This method reads snapshots written by {@link
#writeVersionedSnapshot(DataOutputView, TypeSerializerSnapshot)}.
+ */
+ static <T> TypeSerializerSnapshot<T>
readVersionedSnapshot(DataInputView in, ClassLoader cl) throws IOException {
+ final TypeSerializerSnapshot<T> snapshot =
+
TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(in, cl);
+
+ final int version = in.readInt();
+ snapshot.readSnapshot(version, in, cl);
+
+ return snapshot;
+ }
}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotSerializationUtil.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotSerializationUtil.java
index 0bcff93b802..8e12e301cbb 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotSerializationUtil.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotSerializationUtil.java
@@ -18,6 +18,7 @@
package org.apache.flink.api.common.typeutils;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.io.VersionedIOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -25,8 +26,11 @@
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
+
import java.io.IOException;
+import static org.apache.flink.util.Preconditions.checkState;
+
/**
* Utility methods for serialization of {@link TypeSerializerSnapshot}.
*/
@@ -42,8 +46,6 @@
* @param serializerSnapshot the serializer configuration snapshot to
write
* @param serializer the prior serializer. This needs to be written of
the serializer snapshot
* if the serializer snapshot is still the legacy
{@link TypeSerializerConfigSnapshot}.
- *
- * @throws IOException
*/
public static <T> void writeSerializerSnapshot(
DataOutputView out,
@@ -63,8 +65,6 @@
* restoring from a snapshot taken with
Flink version <= 1.6.
*
* @return the read serializer configuration snapshot
- *
- * @throws IOException
*/
public static <T> TypeSerializerSnapshot<T> readSerializerSnapshot(
DataInputView in,
@@ -78,6 +78,30 @@
return proxy.getSerializerSnapshot();
}
+
+ public static <T> TypeSerializerSnapshot<T>
readAndInstantiateSnapshotClass(DataInputView in, ClassLoader cl) throws
IOException {
+ final String className = in.readUTF();
+
+ final Class<? extends TypeSerializerSnapshot> rawClazz;
+ try {
+ rawClazz = Class
+ .forName(className, false, cl)
+
.asSubclass(TypeSerializerSnapshot.class);
+ }
+ catch (ClassNotFoundException e) {
+ throw new IOException(
+ "Could not find requested
TypeSerializerSnapshot class '" + className + "' in classpath.", e);
+ }
+ catch (ClassCastException e) {
+ throw new IOException("The class '" + className + "' is
not a subclass of TypeSerializerSnapshot.", e);
+ }
+
+ @SuppressWarnings("unchecked")
+ final Class<? extends TypeSerializerSnapshot<T>> clazz =
(Class<? extends TypeSerializerSnapshot<T>>) rawClazz;
+
+ return InstantiationUtil.instantiate(clazz);
+ }
+
/**
* Utility serialization proxy for a {@link TypeSerializerSnapshot}.
*/
@@ -87,8 +111,11 @@
private ClassLoader userCodeClassLoader;
private TypeSerializerSnapshot<T> serializerSnapshot;
- private TypeSerializer<T> serializer;
+ @Nullable private TypeSerializer<T> serializer;
+ /**
+ * Constructor for reading serializers.
+ */
TypeSerializerSnapshotSerializationProxy(
ClassLoader userCodeClassLoader,
@Nullable TypeSerializer<T> existingPriorSerializer) {
@@ -96,6 +123,9 @@
this.serializer = existingPriorSerializer;
}
+ /**
+ * Constructor for writing out serializers.
+ */
TypeSerializerSnapshotSerializationProxy(
TypeSerializerSnapshot<T> serializerConfigSnapshot,
TypeSerializer<T> serializer) {
@@ -107,91 +137,39 @@
* Binary format layout of a written serializer snapshot is as
follows:
*
* <ul>
- * <li>1. Serializer snapshot classname (UTF).</li>
- * <li>2. The originating serializer of the snapshot, if
any, written via Java serialization.
- * Presence of the serializer is indicated by a flag
(boolean -> TypeSerializer).</li>
- * <li>3. The version of the serializer snapshot's binary
format.</li>
- * <li>4. The actual serializer snapshot.</li>
+ * <li>1. Format version of this util.</li>
+ * <li>2. Name of the TypeSerializerSnapshot class.</li>
+ * <li>3. The version of the TypeSerializerSnapshot's
binary format.</li>
+ * <li>4. The actual serializer snapshot data.</li>
* </ul>
*/
+ @SuppressWarnings("deprecation")
@Override
public void write(DataOutputView out) throws IOException {
- super.write(out);
+ setSerializerForWriteIfOldPath(serializerSnapshot,
serializer);
- // config snapshot class, so that we can re-instantiate
the
- // correct type of config snapshot instance when
deserializing
- out.writeUTF(serializerSnapshot.getClass().getName());
+ // write the format version of this utils format
+ super.write(out);
- if (serializerSnapshot instanceof
TypeSerializerConfigSnapshot) {
- // backwards compatible path, where the
serializer snapshot is still using the
- // deprecated interface; the originating
serializer needs to be written to the byte stream
- out.writeBoolean(true);
- @SuppressWarnings("unchecked")
- TypeSerializerConfigSnapshot<T>
legacySerializerSnapshot = (TypeSerializerConfigSnapshot<T>) serializerSnapshot;
-
TypeSerializerSerializationUtil.writeSerializer(out, serializer);
-
- // TypeSerializerConfigSnapshot includes the
version number implicitly when it is written
- legacySerializerSnapshot.write(out);
- } else {
- out.writeBoolean(false);
-
-
out.writeInt(serializerSnapshot.getCurrentVersion());
- serializerSnapshot.write(out);
- }
+ TypeSerializerSnapshot.writeVersionedSnapshot(out,
serializerSnapshot);
}
@SuppressWarnings("unchecked")
@Override
public void read(DataInputView in) throws IOException {
+ // read version
super.read(in);
-
- String serializerConfigClassname = in.readUTF();
- Class<? extends TypeSerializerSnapshot>
serializerConfigSnapshotClass;
- try {
- serializerConfigSnapshotClass = (Class<?
extends TypeSerializerSnapshot>)
-
Class.forName(serializerConfigClassname, false, userCodeClassLoader);
- } catch (ClassNotFoundException e) {
- throw new IOException(
- "Could not find requested
TypeSerializerConfigSnapshot class "
- + serializerConfigClassname +
" in classpath.", e);
- }
-
- serializerSnapshot =
InstantiationUtil.instantiate(serializerConfigSnapshotClass);
-
- if (getReadVersion() >= 2) {
- // Flink version after 1.7
-
- boolean containsPriorSerializer =
in.readBoolean();
-
- TypeSerializer<T> priorSerializer =
(containsPriorSerializer)
- ?
TypeSerializerSerializationUtil.tryReadSerializer(in, userCodeClassLoader, true)
- : null;
-
- if (serializerSnapshot instanceof
TypeSerializerConfigSnapshot) {
- if (priorSerializer != null) {
-
((TypeSerializerConfigSnapshot<T>)
serializerSnapshot).setPriorSerializer(priorSerializer);
-
((TypeSerializerConfigSnapshot<T>)
serializerSnapshot).setUserCodeClassLoader(userCodeClassLoader);
-
((TypeSerializerConfigSnapshot<T>) serializerSnapshot).read(in);
- } else {
- // this occurs if the user
changed a TypeSerializerSnapshot to the
- // legacy
TypeSerializerConfigSnapshot, which isn't supported.
- throw new IOException("Cannot
read a legacy TypeSerializerConfigSnapshot without the prior serializer
present. ");
- }
- } else {
- int readVersion = in.readInt();
- serializerSnapshot.read(readVersion,
in, userCodeClassLoader);
- }
- } else {
- // Flink version before 1.7.x, and after 1.3.x
-
- if (serializerSnapshot instanceof
TypeSerializerConfigSnapshot) {
- ((TypeSerializerConfigSnapshot<T>)
serializerSnapshot).setPriorSerializer(this.serializer);
- ((TypeSerializerConfigSnapshot<T>)
serializerSnapshot).setUserCodeClassLoader(userCodeClassLoader);
- ((TypeSerializerConfigSnapshot<T>)
serializerSnapshot).read(in);
- } else {
- int readVersion = in.readInt();
- serializerSnapshot.read(readVersion,
in, userCodeClassLoader);
- }
+ final int version = getReadVersion();
+
+ switch (version) {
+ case 2:
+ serializerSnapshot = deserializeV2(in,
userCodeClassLoader);
+ break;
+ case 1:
+ serializerSnapshot = deserializeV1(in,
userCodeClassLoader, serializer);
+ break;
+ default:
+ throw new IOException("Unrecognized
version for TypeSerializerSnapshot format: " + version);
}
}
@@ -208,5 +186,57 @@ public int getVersion() {
TypeSerializerSnapshot<T> getSerializerSnapshot() {
return serializerSnapshot;
}
+
+ /**
+ * Deserialization path for Flink versions 1.7+.
+ */
+ @VisibleForTesting
+ static <T> TypeSerializerSnapshot<T>
deserializeV2(DataInputView in, ClassLoader cl) throws IOException {
+ return TypeSerializerSnapshot.readVersionedSnapshot(in,
cl);
+ }
+
+ /**
+ * Deserialization path for Flink versions in [1.4, 1.6].
+ */
+ @VisibleForTesting
+ @SuppressWarnings("deprecation")
+ static <T> TypeSerializerSnapshot<T> deserializeV1(
+ DataInputView in,
+ ClassLoader cl,
+ @Nullable TypeSerializer<T> serializer) throws
IOException {
+
+ TypeSerializerSnapshot<T> snapshot =
readAndInstantiateSnapshotClass(in, cl);
+
+ // if the snapshot was created before Flink 1.7, we
need to distinguish the following cases:
+ // - old snapshot type that needs serializer from the
outside
+ // - new snapshot type that understands the old
format and can produce a restore serializer from it
+ if (snapshot instanceof TypeSerializerConfigSnapshot) {
+ TypeSerializerConfigSnapshot<T> oldTypeSnapshot
= (TypeSerializerConfigSnapshot<T>) snapshot;
+ oldTypeSnapshot.setPriorSerializer(serializer);
+ oldTypeSnapshot.setUserCodeClassLoader(cl);
+ oldTypeSnapshot.read(in);
+ }
+ else {
+ // new type, simple case
+ int readVersion = in.readInt();
+ snapshot.readSnapshot(readVersion, in, cl);
+ }
+
+ return snapshot;
+ }
+
+ @SuppressWarnings("deprecation")
+ private static <T> void setSerializerForWriteIfOldPath(
+ TypeSerializerSnapshot<T> serializerSnapshot,
+ TypeSerializer<T> serializer) {
+
+ // for compatibility with non-upgraded serializers, put
the serializer into the
+ // config snapshot if it of the old version
+ if (serializerSnapshot instanceof
TypeSerializerConfigSnapshot) {
+ checkState(serializer != null);
+
+ ((TypeSerializerConfigSnapshot<T>)
serializerSnapshot).setPriorSerializer(serializer);
+ }
+ }
}
}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/UnloadableDummyTypeSerializer.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/UnloadableDummyTypeSerializer.java
index 448c53b209e..fe55be79e12 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/UnloadableDummyTypeSerializer.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/UnloadableDummyTypeSerializer.java
@@ -22,6 +22,7 @@
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.Preconditions;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InvalidClassException;
import java.util.Arrays;
@@ -33,16 +34,30 @@
public class UnloadableDummyTypeSerializer<T> extends TypeSerializer<T> {
private static final long serialVersionUID = 2526330533671642711L;
+
private final byte[] actualBytes;
+ @Nullable
+ private final Throwable originalError;
+
public UnloadableDummyTypeSerializer(byte[] actualBytes) {
+ this(actualBytes, null);
+ }
+
+ public UnloadableDummyTypeSerializer(byte[] actualBytes, @Nullable
Throwable originalError) {
this.actualBytes = Preconditions.checkNotNull(actualBytes);
+ this.originalError = originalError;
}
public byte[] getActualBytes() {
return actualBytes;
}
+ @Nullable
+ public Throwable getOriginalError() {
+ return originalError;
+ }
+
@Override
public boolean isImmutableType() {
throw new UnsupportedOperationException("This object is a dummy
TypeSerializer.");
diff --git
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index a79ac1de6fc..9c6bcb50bd5 100644
---
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -20,7 +20,6 @@
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -121,12 +120,8 @@ public void testSnapshotConfigurationAndReconfigure()
throws Exception {
new DataInputViewStreamWrapper(in),
Thread.currentThread().getContextClassLoader(), getSerializer());
}
- CompatibilityResult strategy =
getSerializer().ensureCompatibility(restoredConfig);
- assertFalse(strategy.isRequiresMigration());
-
- // also verify that the serializer's reconfigure implementation
detects incompatibility
- strategy = getSerializer().ensureCompatibility(new
TestIncompatibleSerializerConfigSnapshot<>());
- assertTrue(strategy.isRequiresMigration());
+ TypeSerializerSchemaCompatibility<T, ? extends
TypeSerializer<T>> strategy =
restoredConfig.resolveSchemaCompatibility(getSerializer());
+ assertTrue(strategy.isCompatibleAsIs());
}
@Test
@@ -543,23 +538,6 @@ public void skipBytesToRead(int numBytes) throws
IOException {
}
}
- public static final class TestIncompatibleSerializerConfigSnapshot<T>
extends TypeSerializerConfigSnapshot<T> {
- @Override
- public int getVersion() {
- return 0;
- }
-
- @Override
- public boolean equals(Object obj) {
- return obj instanceof
TestIncompatibleSerializerConfigSnapshot;
- }
-
- @Override
- public int hashCode() {
- return getClass().hashCode();
- }
- }
-
private static <T> void checkToString(T value) {
if (value != null) {
value.toString();
diff --git
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
index 8f82ea88ed9..0fae2699807 100644
---
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
+++
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
@@ -20,6 +20,7 @@
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.SerializerTestInstance;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -106,8 +107,8 @@ public void testConfigurationSnapshotSerialization() throws
Exception {
new DataInputViewStreamWrapper(in),
Thread.currentThread().getContextClassLoader(), serializer);
}
- CompatibilityResult<PublicEnum> compatResult =
serializer.ensureCompatibility(restoredConfig);
- assertFalse(compatResult.isRequiresMigration());
+ TypeSerializerSchemaCompatibility<PublicEnum, ?> compatResult =
restoredConfig.resolveSchemaCompatibility(serializer);
+ assertTrue(compatResult.isCompatibleAsIs());
assertEquals(PublicEnum.FOO.ordinal(),
serializer.getValueToOrdinal().get(PublicEnum.FOO).intValue());
assertEquals(PublicEnum.BAR.ordinal(),
serializer.getValueToOrdinal().get(PublicEnum.BAR).intValue());
diff --git
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java
index b4f983382f5..e906f62f65b 100644
---
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java
+++
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java
@@ -18,7 +18,7 @@
package org.apache.flink.api.common.typeutils.base;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -56,7 +56,7 @@
*/
@Test
public void checkIndenticalEnums() throws Exception {
- Assert.assertFalse(checkCompatibility(ENUM_A,
ENUM_A).isRequiresMigration());
+ Assert.assertTrue(checkCompatibility(ENUM_A,
ENUM_A).isCompatibleAsIs());
}
/**
@@ -64,7 +64,7 @@ public void checkIndenticalEnums() throws Exception {
*/
@Test
public void checkAppendedField() throws Exception {
- Assert.assertFalse(checkCompatibility(ENUM_A,
ENUM_B).isRequiresMigration());
+ Assert.assertTrue(checkCompatibility(ENUM_A,
ENUM_B).isCompatibleAsIs());
}
/**
@@ -72,7 +72,7 @@ public void checkAppendedField() throws Exception {
*/
@Test
public void checkRemovedField() throws Exception {
- Assert.assertTrue(checkCompatibility(ENUM_A,
ENUM_C).isRequiresMigration());
+ Assert.assertTrue(checkCompatibility(ENUM_A,
ENUM_C).isIncompatible());
}
/**
@@ -80,11 +80,11 @@ public void checkRemovedField() throws Exception {
*/
@Test
public void checkDifferentFieldOrder() throws Exception {
- Assert.assertFalse(checkCompatibility(ENUM_A,
ENUM_D).isRequiresMigration());
+ Assert.assertTrue(checkCompatibility(ENUM_A,
ENUM_D).isCompatibleAsIs());
}
@SuppressWarnings("unchecked")
- private static CompatibilityResult checkCompatibility(String
enumSourceA, String enumSourceB)
+ private static TypeSerializerSchemaCompatibility
checkCompatibility(String enumSourceA, String enumSourceB)
throws IOException, ClassNotFoundException {
ClassLoader classLoader = compileAndLoadEnum(
@@ -116,7 +116,7 @@ private static CompatibilityResult
checkCompatibility(String enumSourceA, String
}
EnumSerializer enumSerializer2 = new
EnumSerializer(classLoader2.loadClass(ENUM_NAME));
- return enumSerializer2.ensureCompatibility(restoredSnapshot);
+ return
restoredSnapshot.resolveSchemaCompatibility(enumSerializer2);
}
private static ClassLoader compileAndLoadEnum(File root, String
filename, String source) throws IOException {
diff --git
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
index 9620846a4a6..4f54c3ee4d9 100644
---
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
+++
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
@@ -34,13 +34,13 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.SerializerTestBase;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
@@ -63,7 +63,6 @@
import org.junit.Test;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
@@ -311,8 +310,10 @@ public void testReconfigureWithDifferentPojoType() throws
Exception {
new DataInputViewStreamWrapper(in),
Thread.currentThread().getContextClassLoader(), pojoSerializer2);
}
- CompatibilityResult<SubTestUserClassA> compatResult =
pojoSerializer2.ensureCompatibility(pojoSerializerConfigSnapshot);
- assertTrue(compatResult.isRequiresMigration());
+ @SuppressWarnings("unchecked")
+ TypeSerializerSchemaCompatibility<SubTestUserClassA, ?>
compatResult =
+
pojoSerializerConfigSnapshot.resolveSchemaCompatibility(pojoSerializer2);
+ assertTrue(compatResult.isIncompatible());
}
/**
@@ -352,8 +353,10 @@ public void
testReconfigureDifferentSubclassRegistrationOrder() throws Exception
new DataInputViewStreamWrapper(in),
Thread.currentThread().getContextClassLoader(), pojoSerializer);
}
- CompatibilityResult<TestUserClass> compatResult =
pojoSerializer.ensureCompatibility(pojoSerializerConfigSnapshot);
- assertTrue(!compatResult.isRequiresMigration());
+ @SuppressWarnings("unchecked")
+ TypeSerializerSchemaCompatibility<TestUserClass, ?>
compatResult =
+
pojoSerializerConfigSnapshot.resolveSchemaCompatibility(pojoSerializer);
+ assertTrue(compatResult.isCompatibleAsIs());
// reconfigure - check reconfiguration result and that
registration ids remains the same
//assertEquals(ReconfigureResult.COMPATIBLE,
pojoSerializer.reconfigure(pojoSerializerConfigSnapshot));
@@ -397,8 +400,10 @@ public void
testReconfigureRepopulateNonregisteredSubclassSerializerCache() thro
}
// reconfigure - check reconfiguration result and that subclass
serializer cache is repopulated
- CompatibilityResult<TestUserClass> compatResult =
pojoSerializer.ensureCompatibility(pojoSerializerConfigSnapshot);
- assertFalse(compatResult.isRequiresMigration());
+ @SuppressWarnings("unchecked")
+ TypeSerializerSchemaCompatibility<TestUserClass, ?>
compatResult =
+
pojoSerializerConfigSnapshot.resolveSchemaCompatibility(pojoSerializer);
+ assertTrue(compatResult.isCompatibleAsIs());
assertEquals(2,
pojoSerializer.getSubclassSerializerCache().size());
assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassA.class));
assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassB.class));
@@ -460,8 +465,10 @@ public void
testReconfigureWithPreviouslyNonregisteredSubclasses() throws Except
// reconfigure - check reconfiguration result and that
// 1) subclass serializer cache is repopulated
// 2) registrations also contain the now registered subclasses
- CompatibilityResult<TestUserClass> compatResult =
pojoSerializer.ensureCompatibility(pojoSerializerConfigSnapshot);
- assertFalse(compatResult.isRequiresMigration());
+ @SuppressWarnings("unchecked")
+ TypeSerializerSchemaCompatibility<TestUserClass, ?>
compatResult =
+
pojoSerializerConfigSnapshot.resolveSchemaCompatibility(pojoSerializer);
+ assertTrue(compatResult.isCompatibleAsIs());
assertEquals(2,
pojoSerializer.getSubclassSerializerCache().size());
assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassA.class));
assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassB.class));
@@ -537,8 +544,9 @@ public void testReconfigureWithDifferentFieldOrder() throws
Exception {
new HashMap<>()); // empty; irrelevant for this
test
// reconfigure - check reconfiguration result and that fields
are reordered to the previous order
- CompatibilityResult<TestUserClass> compatResult =
pojoSerializer.ensureCompatibility(mockPreviousConfigSnapshot);
- assertFalse(compatResult.isRequiresMigration());
+ TypeSerializerSchemaCompatibility<TestUserClass, ?>
compatResult =
+
mockPreviousConfigSnapshot.resolveSchemaCompatibility(pojoSerializer);
+ assertTrue(compatResult.isCompatibleAsIs());
int i = 0;
for (Field field : mockOriginalFieldOrder) {
assertEquals(field, pojoSerializer.getFields()[i]);
@@ -580,7 +588,7 @@ public void testSerializerSerializationFailureResilience()
throws Exception{
pojoSerializer);
}
-
Assert.assertFalse(pojoSerializer.ensureCompatibility(deserializedConfig).isRequiresMigration());
+
Assert.assertTrue(deserializedConfig.resolveSchemaCompatibility(pojoSerializer).isCompatibleAsIs());
verifyPojoSerializerConfigSnapshotWithSerializerSerializationFailure(config,
deserializedConfig);
}
diff --git
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
index 3c9d990fb31..869941e9174 100644
---
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
+++
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
@@ -19,8 +19,8 @@
package org.apache.flink.api.java.typeutils.runtime.kryo;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
@@ -44,7 +44,6 @@
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -67,8 +66,11 @@ public void testMigrationStrategyForRemovedAvroDependency()
throws Exception {
kryoSerializerConfigSnapshot =
TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
new DataInputViewStreamWrapper(in),
Thread.currentThread().getContextClassLoader(), kryoSerializerForA);
}
- CompatibilityResult<TestClass> compatResult =
kryoSerializerForA.ensureCompatibility(kryoSerializerConfigSnapshot);
- assertFalse(compatResult.isRequiresMigration());
+
+ @SuppressWarnings("unchecked")
+ TypeSerializerSchemaCompatibility<TestClass, ?> compatResult =
+
kryoSerializerConfigSnapshot.resolveSchemaCompatibility(kryoSerializerForA);
+ assertTrue(compatResult.isCompatibleAsIs());
}
@Test
@@ -111,8 +113,10 @@ public void testMigrationStrategyWithDifferentKryoType()
throws Exception {
new DataInputViewStreamWrapper(in),
Thread.currentThread().getContextClassLoader(), kryoSerializerForB);
}
- CompatibilityResult<TestClassB> compatResult =
kryoSerializerForB.ensureCompatibility(kryoSerializerConfigSnapshot);
- assertTrue(compatResult.isRequiresMigration());
+ @SuppressWarnings("unchecked")
+ TypeSerializerSchemaCompatibility<TestClassB, ?> compatResult =
+
kryoSerializerConfigSnapshot.resolveSchemaCompatibility(kryoSerializerForB);
+ assertTrue(compatResult.isIncompatible());
}
@Test
@@ -272,8 +276,10 @@ public void
testMigrationStrategyForDifferentRegistrationOrder() throws Exceptio
}
// reconfigure - check reconfiguration result and that
registration id remains the same
- CompatibilityResult<TestClass> compatResult =
kryoSerializer.ensureCompatibility(kryoSerializerConfigSnapshot);
- assertFalse(compatResult.isRequiresMigration());
+ @SuppressWarnings("unchecked")
+ TypeSerializerSchemaCompatibility<TestClass, ?> compatResult =
+
kryoSerializerConfigSnapshot.resolveSchemaCompatibility(kryoSerializer);
+ assertTrue(compatResult.isCompatibleAsIs());
assertEquals(testClassId,
kryoSerializer.getKryo().getRegistration(TestClass.class).getId());
assertEquals(testClassAId,
kryoSerializer.getKryo().getRegistration(TestClassA.class).getId());
assertEquals(testClassBId,
kryoSerializer.getKryo().getRegistration(TestClassB.class).getId());
diff --git
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/LegacyAvroExternalJarProgramITCase.java
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/LegacyAvroExternalJarProgramITCase.java
new file mode 100644
index 00000000000..1dd56a775aa
--- /dev/null
+++
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/LegacyAvroExternalJarProgramITCase.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.avro.testjar.AvroExternalJarProgram;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.test.util.TestEnvironment;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.URL;
+import java.util.Collections;
+
+/**
+ * IT case for the {@link AvroExternalJarProgram}.
+ */
+public class LegacyAvroExternalJarProgramITCase extends TestLogger {
+
+ private static final String JAR_FILE = "maven-test-jar.jar";
+
+ private static final String TEST_DATA_FILE = "/testdata.avro";
+
+ @Test
+ public void testExternalProgram() {
+
+ LocalFlinkMiniCluster testMiniCluster = null;
+
+ try {
+ int parallelism = 4;
+ Configuration config = new Configuration();
+
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
+ testMiniCluster = new LocalFlinkMiniCluster(config,
false);
+ testMiniCluster.start();
+
+ String jarFile = JAR_FILE;
+ String testData =
getClass().getResource(TEST_DATA_FILE).toString();
+
+ PackagedProgram program = new PackagedProgram(new
File(jarFile), new String[] { testData });
+
+ TestEnvironment.setAsContext(
+ testMiniCluster,
+ parallelism,
+ Collections.singleton(new Path(jarFile)),
+ Collections.<URL>emptyList());
+
+ config.setString(JobManagerOptions.ADDRESS,
"localhost");
+ config.setInteger(JobManagerOptions.PORT,
testMiniCluster.getLeaderRPCPort());
+
+ program.invokeInteractiveModeForExecution();
+ }
+ catch (Throwable t) {
+ System.err.println(t.getMessage());
+ t.printStackTrace();
+ Assert.fail("Error during the packaged program
execution: " + t.getMessage());
+ }
+ finally {
+ TestEnvironment.unsetAsContext();
+
+ if (testMiniCluster != null) {
+ try {
+ testMiniCluster.stop();
+ } catch (Throwable t) {
+ // ignore
+ }
+ }
+ }
+ }
+}
diff --git
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
index 7b8763bfa2f..463f8f6f26d 100644
---
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
+++
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
@@ -36,7 +36,6 @@
import java.util.Random;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -105,10 +104,10 @@ public void testCompatibilityWithPojoSerializer() throws
Exception {
validateDeserialization(serializer);
// sanity check for the test: check that a PoJoSerializer and
the original serializer work together
-
assertFalse(serializer.ensureCompatibility(configSnapshot).isRequiresMigration());
+
assertTrue(configSnapshot.resolveSchemaCompatibility(serializer).isCompatibleAsIs());
final TypeSerializer<SimpleUser> newSerializer = new
AvroTypeInfo<>(SimpleUser.class, true).createSerializer(new ExecutionConfig());
-
assertFalse(newSerializer.ensureCompatibility(configSnapshot).isRequiresMigration());
+
assertTrue(configSnapshot.resolveSchemaCompatibility(newSerializer).isCompatibleAsIs());
// deserialize the data and make sure this still works
validateDeserialization(newSerializer);
@@ -116,7 +115,7 @@ public void testCompatibilityWithPojoSerializer() throws
Exception {
TypeSerializerSnapshot<SimpleUser> nextSnapshot =
newSerializer.snapshotConfiguration();
final TypeSerializer<SimpleUser> nextSerializer = new
AvroTypeInfo<>(SimpleUser.class, true).createSerializer(new ExecutionConfig());
-
assertFalse(nextSerializer.ensureCompatibility(nextSnapshot).isRequiresMigration());
+
assertTrue(nextSnapshot.resolveSchemaCompatibility(nextSerializer).isCompatibleAsIs());
// deserialize the data and make sure this still works
validateDeserialization(nextSerializer);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index eae5a3bccdd..ae4fbaa133b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -24,9 +24,9 @@
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
@@ -228,42 +228,32 @@ public void dispose() {
final StateMetaInfoSnapshot metaInfoSnapshot =
restoredBroadcastStateMetaInfos.get(name);
- @SuppressWarnings("unchecked")
- RegisteredBroadcastStateBackendMetaInfo<K, V>
restoredMetaInfo = new RegisteredBroadcastStateBackendMetaInfo<K,
V>(metaInfoSnapshot);
+ // check whether new serializers are incompatible
+ TypeSerializerSnapshot<K> keySerializerSnapshot =
Preconditions.checkNotNull(
+ (TypeSerializerSnapshot<K>)
metaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER));
- // check compatibility to determine if state migration
is required
- CompatibilityResult<K> keyCompatibility =
CompatibilityUtil.resolveCompatibilityResult(
- restoredMetaInfo.getKeySerializer(),
- UnloadableDummyTypeSerializer.class,
- //TODO this keys should not be exposed
and should be adapted after FLINK-9377 was merged
-
metaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER),
- broadcastStateKeySerializer);
-
- CompatibilityResult<V> valueCompatibility =
CompatibilityUtil.resolveCompatibilityResult(
- restoredMetaInfo.getValueSerializer(),
- UnloadableDummyTypeSerializer.class,
- //TODO this keys should not be exposed
and should be adapted after FLINK-9377 was merged
-
metaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
- broadcastStateValueSerializer);
-
- if (!keyCompatibility.isRequiresMigration() &&
!valueCompatibility.isRequiresMigration()) {
- // new serializer is compatible; use it to
replace the old serializer
- broadcastState.setStateMetaInfo(
- new
RegisteredBroadcastStateBackendMetaInfo<>(
- name,
-
OperatorStateHandle.Mode.BROADCAST,
-
broadcastStateKeySerializer,
-
broadcastStateValueSerializer));
- } else {
- // TODO state migration currently isn't
possible.
-
- // NOTE: for heap backends, it is actually fine
to proceed here without failing the restore,
- // since the state has already been
deserialized to objects and we can just continue with
- // the new serializer; we're deliberately
failing here for now to have equal functionality with
- // the RocksDB backend to avoid confusion for
users.
-
- throw StateMigrationException.notSupported();
+ TypeSerializerSchemaCompatibility<K, ?>
keyCompatibility =
+
keySerializerSnapshot.resolveSchemaCompatibility(broadcastStateKeySerializer);
+ if (keyCompatibility.isIncompatible()) {
+ throw new StateMigrationException("The new key
serializer for broadcast state must not be incompatible.");
+ }
+
+ TypeSerializerSnapshot<V> valueSerializerSnapshot =
Preconditions.checkNotNull(
+ (TypeSerializerSnapshot<V>)
metaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
+
+ TypeSerializerSchemaCompatibility<V, ?>
valueCompatibility =
+
valueSerializerSnapshot.resolveSchemaCompatibility(broadcastStateValueSerializer);
+ if (valueCompatibility.isIncompatible()) {
+ throw new StateMigrationException("The new
value serializer for broadcast state must not be incompatible.");
}
+
+ // new serializer is compatible; use it to replace the
old serializer
+ broadcastState.setStateMetaInfo(
+ new
RegisteredBroadcastStateBackendMetaInfo<>(
+ name,
+
OperatorStateHandle.Mode.BROADCAST,
+
broadcastStateKeySerializer,
+
broadcastStateValueSerializer));
}
accessedBroadcastStatesByName.put(name, broadcastState);
@@ -606,27 +596,19 @@ public void addAll(List<S> values) {
// check compatibility to determine if state migration
is required
TypeSerializer<S> newPartitionStateSerializer =
partitionStateSerializer.duplicate();
- CompatibilityResult<S> stateCompatibility =
CompatibilityUtil.resolveCompatibilityResult(
- metaInfo.getPartitionStateSerializer(),
- UnloadableDummyTypeSerializer.class,
- //TODO this keys should not be exposed
and should be adapted after FLINK-9377 was merged
-
restoredSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
- newPartitionStateSerializer);
-
- if (!stateCompatibility.isRequiresMigration()) {
- // new serializer is compatible; use it to
replace the old serializer
- partitionableListState.setStateMetaInfo(
- new
RegisteredOperatorStateBackendMetaInfo<>(name, newPartitionStateSerializer,
mode));
- } else {
- // TODO state migration currently isn't
possible.
-
- // NOTE: for heap backends, it is actually fine
to proceed here without failing the restore,
- // since the state has already been
deserialized to objects and we can just continue with
- // the new serializer; we're deliberately
failing here for now to have equal functionality with
- // the RocksDB backend to avoid confusion for
users.
-
- throw StateMigrationException.notSupported();
+
+ @SuppressWarnings("unchecked")
+ TypeSerializerSnapshot<S> stateSerializerSnapshot =
Preconditions.checkNotNull(
+ (TypeSerializerSnapshot<S>)
restoredSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
+
+ TypeSerializerSchemaCompatibility<S, ?>
stateCompatibility =
+
stateSerializerSnapshot.resolveSchemaCompatibility(newPartitionStateSerializer);
+ if (stateCompatibility.isIncompatible()) {
+ throw new StateMigrationException("The new
state serializer for operator state must not be incompatible.");
}
+
+ partitionableListState.setStateMetaInfo(
+ new
RegisteredOperatorStateBackendMetaInfo<>(name, newPartitionStateSerializer,
mode));
}
accessedStatesByName.put(name, partitionableListState);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
index ab6e8b1a046..585490dc75f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
@@ -23,7 +23,6 @@
import
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.io.VersionedIOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
@@ -63,9 +62,6 @@
/** This specifies if we use a compressed format write the key-groups */
private boolean usingKeyGroupCompression;
- /** This specifies whether or not to use dummy {@link
UnloadableDummyTypeSerializer} when serializers cannot be read. */
- private boolean isSerializerPresenceRequired;
-
// TODO the keySerializer field should be removed, once all serializers
have the restoreSerializer() method implemented
private TypeSerializer<K> keySerializer;
private TypeSerializerSnapshot<K> keySerializerConfigSnapshot;
@@ -74,9 +70,8 @@
private ClassLoader userCodeClassLoader;
- public KeyedBackendSerializationProxy(ClassLoader userCodeClassLoader,
boolean isSerializerPresenceRequired) {
+ public KeyedBackendSerializationProxy(ClassLoader userCodeClassLoader) {
this.userCodeClassLoader =
Preconditions.checkNotNull(userCodeClassLoader);
- this.isSerializerPresenceRequired =
isSerializerPresenceRequired;
}
public KeyedBackendSerializationProxy(
@@ -98,10 +93,6 @@ public KeyedBackendSerializationProxy(
return stateMetaInfoSnapshots;
}
- public TypeSerializer<K> restoreKeySerializer() {
- return keySerializerConfigSnapshot.restoreSerializer();
- }
-
public TypeSerializerSnapshot<K> getKeySerializerConfigSnapshot() {
return keySerializerConfigSnapshot;
}
@@ -163,10 +154,6 @@ public void read(DataInputView in) throws IOException {
}
this.keySerializer = null;
- if (isSerializerPresenceRequired) {
-
checkSerializerPresence(this.keySerializerConfigSnapshot.restoreSerializer());
- }
-
Integer metaInfoSnapshotVersion =
META_INFO_SNAPSHOT_FORMAT_VERSION_MAPPER.get(readVersion);
if (metaInfoSnapshotVersion == null) {
// this should not happen; guard for the future
@@ -181,22 +168,7 @@ public void read(DataInputView in) throws IOException {
for (int i = 0; i < numKvStates; i++) {
StateMetaInfoSnapshot snapshot =
stateMetaInfoReader.readStateMetaInfoSnapshot(in, userCodeClassLoader);
- if (isSerializerPresenceRequired) {
- checkSerializerPresence(
-
snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER));
- checkSerializerPresence(
-
snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
- }
stateMetaInfoSnapshots.add(snapshot);
}
}
-
- private void checkSerializerPresence(TypeSerializer<?> serializer)
throws IOException {
- if (serializer instanceof UnloadableDummyTypeSerializer) {
- throw new IOException("Unable to restore keyed state,
because a previous serializer" +
- " of the keyed state is not present The
serializer could have been removed from the classpath, " +
- " or its implementation have changed and could
not be loaded. This is a temporary restriction that will" +
- " be fixed in future versions.");
- }
- }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
index 7f95ed70326..a92527a0371 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
@@ -19,11 +19,9 @@
package org.apache.flink.runtime.state;
import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
@@ -191,23 +189,25 @@ public int hashCode() {
}
// check compatibility results to determine if state migration
is required
- CompatibilityResult<N> namespaceCompatibility =
CompatibilityUtil.resolveCompatibilityResult(
-
restoredStateMetaInfoSnapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER),
- null,
-
restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
-
StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER),
- newNamespaceSerializer);
+ @SuppressWarnings("unchecked")
+ TypeSerializerSnapshot<N> namespaceSerializerSnapshot =
Preconditions.checkNotNull(
+ (TypeSerializerSnapshot<N>)
restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
+
StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER));
+
+ TypeSerializerSchemaCompatibility<N, ?> namespaceCompatibility =
+
namespaceSerializerSnapshot.resolveSchemaCompatibility(newNamespaceSerializer);
TypeSerializer<S> newStateSerializer =
newStateDescriptor.getSerializer();
- CompatibilityResult<S> stateCompatibility =
CompatibilityUtil.resolveCompatibilityResult(
- restoredStateMetaInfoSnapshot.restoreTypeSerializer(
-
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
- UnloadableDummyTypeSerializer.class,
-
restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
-
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
- newStateSerializer);
-
- if (namespaceCompatibility.isRequiresMigration() ||
stateCompatibility.isRequiresMigration()) {
+
+ @SuppressWarnings("unchecked")
+ TypeSerializerSnapshot<S> stateSerializerSnapshot =
Preconditions.checkNotNull(
+ (TypeSerializerSnapshot<S>)
restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
+
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
+
+ TypeSerializerSchemaCompatibility<S, ?> stateCompatibility =
+
stateSerializerSnapshot.resolveSchemaCompatibility(newStateSerializer);
+
+ if (!namespaceCompatibility.isCompatibleAsIs() ||
!stateCompatibility.isCompatibleAsIs()) {
// TODO state migration currently isn't possible.
throw StateMigrationException.notSupported();
} else {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java
index b7dff59aef0..4132d144a4a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java
@@ -42,21 +42,4 @@ public String getName() {
@Nonnull
public abstract StateMetaInfoSnapshot snapshot();
-
- public static RegisteredStateMetaInfoBase fromMetaInfoSnapshot(@Nonnull
StateMetaInfoSnapshot snapshot) {
-
- final StateMetaInfoSnapshot.BackendStateType backendStateType =
snapshot.getBackendStateType();
- switch (backendStateType) {
- case KEY_VALUE:
- return new
RegisteredKeyValueStateBackendMetaInfo<>(snapshot);
- case OPERATOR:
- return new
RegisteredOperatorStateBackendMetaInfo<>(snapshot);
- case BROADCAST:
- return new
RegisteredBroadcastStateBackendMetaInfo<>(snapshot);
- case PRIORITY_QUEUE:
- return new
RegisteredPriorityQueueStateBackendMetaInfo<>(snapshot);
- default:
- throw new IllegalArgumentException("Unknown
backend state type: " + backendStateType);
- }
- }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 837e51fafc0..6ade53caf10 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -28,10 +28,9 @@
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -206,14 +205,15 @@ public HeapKeyedStateBackend(
StateMetaInfoSnapshot.CommonSerializerKeys
serializerKey =
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER;
- CompatibilityResult<T> compatibilityResult =
CompatibilityUtil.resolveCompatibilityResult(
-
restoredMetaInfoSnapshot.restoreTypeSerializer(serializerKey),
- null,
-
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(serializerKey),
- byteOrderedElementSerializer);
+ @SuppressWarnings("unchecked")
+ TypeSerializerSnapshot<T> serializerSnapshot =
Preconditions.checkNotNull(
+ (TypeSerializerSnapshot<T>)
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(serializerKey));
+
+ TypeSerializerSchemaCompatibility<T, ?>
compatibilityResult =
+
serializerSnapshot.resolveSchemaCompatibility(byteOrderedElementSerializer);
- if (compatibilityResult.isRequiresMigration()) {
- throw new
FlinkRuntimeException(StateMigrationException.notSupported());
+ if (compatibilityResult.isIncompatible()) {
+ throw new FlinkRuntimeException(new
StateMigrationException("For heap backends, the new priority queue serializer
must not be incompatible."));
} else {
registeredPQStates.put(
stateName,
@@ -405,26 +405,17 @@ private void
restorePartitionedState(Collection<KeyedStateHandle> state) throws
try {
DataInputViewStreamWrapper inView = new
DataInputViewStreamWrapper(fsDataInputStream);
- // isSerializerPresenceRequired flag is set to
true, since for the heap state backend,
- // deserialization of state happens eagerly at
restore time
KeyedBackendSerializationProxy<K>
serializationProxy =
- new
KeyedBackendSerializationProxy<>(userCodeClassLoader, true);
+ new
KeyedBackendSerializationProxy<>(userCodeClassLoader);
serializationProxy.read(inView);
if (!keySerializerRestored) {
// check for key serializer
compatibility; this also reconfigures the
// key serializer to be compatible, if
it is required and is possible
- if
(CompatibilityUtil.resolveCompatibilityResult(
-
serializationProxy.restoreKeySerializer(),
-
UnloadableDummyTypeSerializer.class,
-
serializationProxy.getKeySerializerConfigSnapshot(),
- keySerializer)
- .isRequiresMigration()) {
-
- // TODO replace with state
migration; note that key hash codes need to remain the same after migration
- throw new
StateMigrationException("The new key serializer is not compatible to read
previous keys. " +
- "Aborting now since
state migration is currently not available");
+ if
(!serializationProxy.getKeySerializerConfigSnapshot()
+
.resolveSchemaCompatibility(keySerializer).isCompatibleAsIs()) {
+ throw new
StateMigrationException("The new key serializer must be compatible.");
}
keySerializerRestored = true;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index 215d7d36c96..d5310412385 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -32,13 +32,13 @@
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FutureUtil;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
-import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.concurrent.RunnableFuture;
@@ -141,8 +141,8 @@ public void
testOperatorStateRestoreFailsIfSerializerDeserializationFails() thro
operatorStateBackend.restore(StateObjectCollection.singleton(stateHandle));
fail("The operator state restore should have failed if
the previous state serializer could not be loaded.");
- } catch (IOException expected) {
-
Assert.assertTrue(expected.getMessage().contains("Unable to restore operator
state"));
+ } catch (Exception expected) {
+
Assert.assertTrue(ExceptionUtils.findThrowable(expected,
ClassNotFoundException.class).isPresent());
} finally {
stateHandle.discardState();
}
@@ -194,8 +194,8 @@ public void
testKeyedStateRestoreFailsIfSerializerDeserializationFails() throws
Collections.singleton(StringSerializer.class.getName()))));
fail("The keyed state restore should have failed if the
previous state serializer could not be loaded.");
- } catch (IOException expected) {
-
Assert.assertTrue(expected.getMessage().contains("Unable to restore keyed
state"));
+ } catch (Exception expected) {
+
Assert.assertTrue(ExceptionUtils.findThrowable(expected,
ClassNotFoundException.class).isPresent());
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index ab09557691a..4976b302e83 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -41,6 +41,7 @@
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.runtime.util.BlockingCheckpointOutputStream;
import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FutureUtil;
import org.apache.flink.util.Preconditions;
@@ -889,8 +890,8 @@ public void
testRestoreFailsIfSerializerDeserializationFails() throws Exception
operatorStateBackend.restore(StateObjectCollection.singleton(stateHandle));
fail("The operator state restore should have failed if
the previous state serializer could not be loaded.");
- } catch (IOException expected) {
-
Assert.assertTrue(expected.getMessage().contains("Unable to restore operator
state"));
+ } catch (Exception expected) {
+
Assert.assertTrue(ExceptionUtils.findThrowable(expected,
ClassNotFoundException.class).isPresent());
} finally {
stateHandle.discardState();
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
index 97665518d38..7858b5c74fc 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
@@ -20,7 +20,6 @@
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
@@ -34,6 +33,7 @@
import
org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters;
import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader;
+import org.apache.flink.util.ExceptionUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -72,14 +72,13 @@ public void testKeyedBackendSerializationProxyRoundtrip()
throws Exception {
}
serializationProxy =
- new
KeyedBackendSerializationProxy<>(Thread.currentThread().getContextClassLoader(),
true);
+ new
KeyedBackendSerializationProxy<>(Thread.currentThread().getContextClassLoader());
try (ByteArrayInputStreamWithPos in = new
ByteArrayInputStreamWithPos(serialized)) {
serializationProxy.read(new
DataInputViewStreamWrapper(in));
}
Assert.assertTrue(serializationProxy.isUsingKeyGroupCompression());
- Assert.assertEquals(keySerializer,
serializationProxy.restoreKeySerializer());
Assert.assertEquals(keySerializer.snapshotConfiguration(),
serializationProxy.getKeySerializerConfigSnapshot());
assertEqualStateMetaInfoSnapshotsLists(stateMetaInfoList,
serializationProxy.getStateMetaInfoSnapshots());
}
@@ -120,21 +119,24 @@ public void
testKeyedBackendSerializationProxyRoundtripWithSerializerSerializati
new KeyedBackendSerializationProxy<>(
new ArtificialCNFExceptionThrowingClassLoader(
Thread.currentThread().getContextClassLoader(),
- cnfThrowingSerializerClasses),
- false);
+ cnfThrowingSerializerClasses));
try (ByteArrayInputStreamWithPos in = new
ByteArrayInputStreamWithPos(serialized)) {
serializationProxy.read(new
DataInputViewStreamWrapper(in));
}
Assert.assertEquals(true,
serializationProxy.isUsingKeyGroupCompression());
- Assert.assertTrue(serializationProxy.restoreKeySerializer()
instanceof UnloadableDummyTypeSerializer);
Assert.assertEquals(keySerializer.snapshotConfiguration(),
serializationProxy.getKeySerializerConfigSnapshot());
for (StateMetaInfoSnapshot snapshot :
serializationProxy.getStateMetaInfoSnapshots()) {
- final RegisteredKeyValueStateBackendMetaInfo<?, ?>
restoredMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(snapshot);
-
Assert.assertTrue(restoredMetaInfo.getNamespaceSerializer() instanceof
UnloadableDummyTypeSerializer);
- Assert.assertTrue(restoredMetaInfo.getStateSerializer()
instanceof UnloadableDummyTypeSerializer);
+ try {
+ // creating a registered meta info from the
snapshot would fail, because the serializer snapshots
+ // cannot create a proper restore serializer
+ new
RegisteredKeyValueStateBackendMetaInfo<>(snapshot);
+ } catch (Exception e) {
+
Assert.assertTrue(ExceptionUtils.findThrowable(e,
ClassNotFoundException.class).isPresent());
+ }
+
Assert.assertEquals(namespaceSerializer.snapshotConfiguration(),
snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER));
Assert.assertEquals(stateSerializer.snapshotConfiguration(),
snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
}
@@ -168,7 +170,7 @@ public void testKeyedStateMetaInfoSerialization() throws
Exception {
}
@Test
- public void testKeyedStateMetaInfoReadSerializerFailureResilience()
throws Exception {
+ public void
testKeyedStateMetaInfoReadWithSerializerSerializationFailure() throws Exception
{
String name = "test";
TypeSerializer<?> namespaceSerializer = LongSerializer.INSTANCE;
TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
@@ -198,11 +200,15 @@ public void
testKeyedStateMetaInfoReadSerializerFailureResilience() throws Excep
new DataInputViewStreamWrapper(in),
classLoader);
}
- RegisteredKeyValueStateBackendMetaInfo<?, ?> restoredMetaInfo =
new RegisteredKeyValueStateBackendMetaInfo<>(snapshot);
+ try {
+ // creating a registered meta info from the snapshot
would fail, because the serializer snapshots
+ // cannot create a proper restore serializer
+ new RegisteredKeyValueStateBackendMetaInfo<>(snapshot);
+ } catch (Exception e) {
+ Assert.assertTrue(ExceptionUtils.findThrowable(e,
ClassNotFoundException.class).isPresent());
+ }
- Assert.assertEquals(name, restoredMetaInfo.getName());
- Assert.assertTrue(restoredMetaInfo.getNamespaceSerializer()
instanceof UnloadableDummyTypeSerializer);
- Assert.assertTrue(restoredMetaInfo.getStateSerializer()
instanceof UnloadableDummyTypeSerializer);
+ Assert.assertEquals(name, snapshot.getName());
Assert.assertEquals(namespaceSerializer.snapshotConfiguration(),
snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER));
Assert.assertEquals(stateSerializer.snapshotConfiguration(),
snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
}
@@ -320,7 +326,7 @@ public void testBroadcastStateMetaInfoSerialization()
throws Exception {
}
@Test
- public void testOperatorStateMetaInfoReadSerializerFailureResilience()
throws Exception {
+ public void
testOperatorStateMetaInfoReadWithSerializerSerializationFailure() throws
Exception {
String name = "test";
TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
@@ -348,18 +354,22 @@ public void
testOperatorStateMetaInfoReadSerializerFailureResilience() throws Ex
snapshot = reader.readStateMetaInfoSnapshot(new
DataInputViewStreamWrapper(in), classLoader);
}
- RegisteredOperatorStateBackendMetaInfo<?> restoredMetaInfo =
+ try {
+ // creating a registered meta info from the snapshot
would fail, because the serializer snapshots
+ // cannot create a proper restore serializer
new RegisteredOperatorStateBackendMetaInfo<>(snapshot);
+ } catch (Exception e) {
+ Assert.assertTrue(ExceptionUtils.findThrowable(e,
ClassNotFoundException.class).isPresent());
+ }
- Assert.assertEquals(name, restoredMetaInfo.getName());
-
Assert.assertTrue(restoredMetaInfo.getPartitionStateSerializer() instanceof
UnloadableDummyTypeSerializer);
+ Assert.assertEquals(name, snapshot.getName());
Assert.assertEquals(
stateSerializer.snapshotConfiguration(),
snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
}
@Test
- public void testBroadcastStateMetaInfoReadSerializerFailureResilience()
throws Exception {
+ public void
testBroadcastStateMetaInfoReadWithSerializerSerializationFailure() throws
Exception {
String broadcastName = "broadcastTest";
TypeSerializer<?> keySerializer = DoubleSerializer.INSTANCE;
TypeSerializer<?> valueSerializer = StringSerializer.INSTANCE;
@@ -393,14 +403,17 @@ public void
testBroadcastStateMetaInfoReadSerializerFailureResilience() throws E
snapshot = reader.readStateMetaInfoSnapshot(new
DataInputViewStreamWrapper(in), classLoader);
}
- RegisteredBroadcastStateBackendMetaInfo<?, ?> restoredMetaInfo =
+ try {
+ // creating a registered meta info from the snapshot
would fail, because the serializer snapshots
+ // cannot create a proper restore serializer
new RegisteredBroadcastStateBackendMetaInfo<>(snapshot);
+ } catch (Exception e) {
+ Assert.assertTrue(ExceptionUtils.findThrowable(e,
ClassNotFoundException.class).isPresent());
+ }
- Assert.assertEquals(broadcastName, restoredMetaInfo.getName());
- Assert.assertEquals(OperatorStateHandle.Mode.BROADCAST,
restoredMetaInfo.getAssignmentMode());
- Assert.assertTrue(restoredMetaInfo.getKeySerializer()
instanceof UnloadableDummyTypeSerializer);
+ Assert.assertEquals(broadcastName, snapshot.getName());
+
Assert.assertEquals(OperatorStateHandle.Mode.BROADCAST.toString(),
snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE));
Assert.assertEquals(keySerializer.snapshotConfiguration(),
snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER));
- Assert.assertTrue(restoredMetaInfo.getValueSerializer()
instanceof UnloadableDummyTypeSerializer);
Assert.assertEquals(valueSerializer.snapshotConfiguration(),
snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
}
diff --git
a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerCompatibilityTest.scala
b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerCompatibilityTest.scala
index 3b8331abdf6..5990857c3cc 100644
---
a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerCompatibilityTest.scala
+++
b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerCompatibilityTest.scala
@@ -21,13 +21,13 @@ package org.apache.flink.api.scala.runtime
import java.io.InputStream
import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil
+import org.apache.flink.api.common.typeutils.{TypeSerializer,
TypeSerializerSerializationUtil, TypeSerializerSnapshot}
import
org.apache.flink.api.java.typeutils.runtime.TupleSerializerConfigSnapshot
import org.apache.flink.api.scala.createTypeInformation
import
org.apache.flink.api.scala.runtime.TupleSerializerCompatibilityTestGenerator._
import org.apache.flink.api.scala.typeutils.CaseClassSerializer
import org.apache.flink.core.memory.DataInputViewStreamWrapper
-import org.junit.Assert.{assertEquals, assertFalse, assertNotNull, assertTrue}
+import org.junit.Assert.{assertEquals, assertNotNull, assertTrue}
import org.junit.Test
/**
@@ -48,8 +48,11 @@ class TupleSerializerCompatibilityTest {
assertEquals(1, deserialized.size)
- val oldSerializer = deserialized.get(0).f0
- val oldConfigSnapshot = deserialized.get(0).f1
+ val oldSerializer: TypeSerializer[TestCaseClass] =
+ deserialized.get(0).f0.asInstanceOf[TypeSerializer[TestCaseClass]]
+
+ val oldConfigSnapshot: TypeSerializerSnapshot[TestCaseClass] =
+
deserialized.get(0).f1.asInstanceOf[TypeSerializerSnapshot[TestCaseClass]]
// test serializer and config snapshot
assertNotNull(oldSerializer)
@@ -61,9 +64,9 @@ class TupleSerializerCompatibilityTest {
val currentSerializer = createTypeInformation[TestCaseClass]
.createSerializer(new ExecutionConfig())
- assertFalse(currentSerializer
- .ensureCompatibility(oldConfigSnapshot)
- .isRequiresMigration)
+ assertTrue(oldConfigSnapshot
+ .resolveSchemaCompatibility(currentSerializer)
+ .isCompatibleAsIs)
// test old data serialization
is.close()
diff --git
a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala
b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala
index 55e2419a97a..f400dc33612 100644
---
a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala
+++
b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala
@@ -21,7 +21,7 @@ package org.apache.flink.api.scala.typeutils
import java.io._
import java.net.{URL, URLClassLoader}
-import org.apache.flink.api.common.typeutils.{CompatibilityResult,
TypeSerializerSnapshotSerializationUtil}
+import
org.apache.flink.api.common.typeutils.{TypeSerializerSchemaCompatibility,
TypeSerializerSnapshotSerializationUtil}
import org.apache.flink.core.memory.{DataInputViewStreamWrapper,
DataOutputViewStreamWrapper}
import org.apache.flink.util.TestLogger
import org.junit.rules.TemporaryFolder
@@ -84,7 +84,7 @@ class EnumValueSerializerUpgradeTest extends TestLogger with
JUnitSuiteLike {
*/
@Test
def checkIdenticalEnums(): Unit = {
- assertFalse(checkCompatibility(enumA, enumA).isRequiresMigration)
+ assertTrue(checkCompatibility(enumA, enumA).isCompatibleAsIs)
}
/**
@@ -92,7 +92,7 @@ class EnumValueSerializerUpgradeTest extends TestLogger with
JUnitSuiteLike {
*/
@Test
def checkAppendedField(): Unit = {
- assertFalse(checkCompatibility(enumA, enumB).isRequiresMigration)
+ assertTrue(checkCompatibility(enumA, enumB).isCompatibleAsIs)
}
/**
@@ -100,7 +100,7 @@ class EnumValueSerializerUpgradeTest extends TestLogger
with JUnitSuiteLike {
*/
@Test
def checkRemovedField(): Unit = {
- assertTrue(checkCompatibility(enumA, enumC).isRequiresMigration)
+ assertTrue(checkCompatibility(enumA, enumC).isIncompatible)
}
/**
@@ -108,7 +108,7 @@ class EnumValueSerializerUpgradeTest extends TestLogger
with JUnitSuiteLike {
*/
@Test
def checkDifferentFieldOrder(): Unit = {
- assertTrue(checkCompatibility(enumA, enumD).isRequiresMigration)
+ assertTrue(checkCompatibility(enumA, enumD).isIncompatible)
}
/**
@@ -117,12 +117,12 @@ class EnumValueSerializerUpgradeTest extends TestLogger
with JUnitSuiteLike {
@Test
def checkDifferentIds(): Unit = {
assertTrue(
- "Different ids should cause a migration.",
- checkCompatibility(enumA, enumE).isRequiresMigration)
+ "Different ids should be incompatible.",
+ checkCompatibility(enumA, enumE).isIncompatible)
}
def checkCompatibility(enumSourceA: String, enumSourceB: String)
- : CompatibilityResult[Enumeration#Value] = {
+ : TypeSerializerSchemaCompatibility[Enumeration#Value, _] = {
import EnumValueSerializerUpgradeTest._
val classLoader = compileAndLoadEnum(tempFolder.newFolder(),
s"$enumName.scala", enumSourceA)
@@ -152,7 +152,7 @@ class EnumValueSerializerUpgradeTest extends TestLogger
with JUnitSuiteLike {
val enum2 = instantiateEnum[Enumeration](classLoader2, enumName)
val enumValueSerializer2 = new EnumValueSerializer(enum2)
- enumValueSerializer2.ensureCompatibility(snapshot2)
+ snapshot2.resolveSchemaCompatibility(enumValueSerializer2)
}
}
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index a4b4ce80dc2..885d582bbc8 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -27,10 +27,9 @@
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
@@ -724,22 +723,15 @@ private void restoreKVStateMetaData() throws IOException,
StateMigrationExceptio
// that the new serializer for states could be
compatible, and therefore the restore can continue
// without old serializers required to be present.
KeyedBackendSerializationProxy<K> serializationProxy =
- new
KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader,
false);
+ new
KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
serializationProxy.read(currentStateHandleInView);
// check for key serializer compatibility; this also
reconfigures the
// key serializer to be compatible, if it is required
and is possible
- if (CompatibilityUtil.resolveCompatibilityResult(
- serializationProxy.restoreKeySerializer(),
- UnloadableDummyTypeSerializer.class,
-
serializationProxy.getKeySerializerConfigSnapshot(),
- rocksDBKeyedStateBackend.keySerializer)
- .isRequiresMigration()) {
-
- // TODO replace with state migration; note that
key hash codes need to remain the same after migration
- throw new StateMigrationException("The new key
serializer is not compatible to read previous keys. " +
- "Aborting now since state migration is
currently not available");
+ if (!serializationProxy.getKeySerializerConfigSnapshot()
+
.resolveSchemaCompatibility(rocksDBKeyedStateBackend.keySerializer).isCompatibleAsIs())
{
+ throw new StateMigrationException("The new key
serializer must be compatible.");
}
this.keygroupStreamCompressionDecorator =
serializationProxy.isUsingKeyGroupCompression() ?
@@ -761,15 +753,12 @@ private void restoreKVStateMetaData() throws IOException,
StateMigrationExceptio
nameBytes,
rocksDBKeyedStateBackend.columnOptions);
- RegisteredStateMetaInfoBase
stateMetaInfo =
-
RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(restoredMetaInfo);
-
rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(),
restoredMetaInfo);
ColumnFamilyHandle columnFamily =
rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor);
- registeredColumn = new
Tuple2<>(columnFamily, stateMetaInfo);
-
rocksDBKeyedStateBackend.registerKvStateInformation(stateMetaInfo.getName(),
registeredColumn);
+ registeredColumn = new
Tuple2<>(columnFamily, null);
+
rocksDBKeyedStateBackend.kvStateInformation.put(restoredMetaInfo.getName(),
registeredColumn);
} else {
// TODO with eager state registration
in place, check here for serializer migration strategies
@@ -1079,13 +1068,10 @@ private ColumnFamilyHandle
getOrRegisterColumnFamilyHandle(
stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName());
if (null == registeredStateMetaInfoEntry) {
- RegisteredStateMetaInfoBase stateMetaInfo =
-
RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot);
-
registeredStateMetaInfoEntry =
new Tuple2<>(
columnFamilyHandle != null ?
columnFamilyHandle : stateBackend.db.createColumnFamily(columnFamilyDescriptor),
- stateMetaInfo);
+ null);
stateBackend.registerKvStateInformation(
stateMetaInfoSnapshot.getName(),
@@ -1213,12 +1199,10 @@ private void restoreLocalStateIntoFullInstance(
StateMetaInfoSnapshot stateMetaInfoSnapshot =
stateMetaInfoSnapshots.get(i);
ColumnFamilyHandle columnFamilyHandle =
columnFamilyHandles.get(i);
- RegisteredStateMetaInfoBase stateMetaInfo =
-
RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot);
stateBackend.registerKvStateInformation(
stateMetaInfoSnapshot.getName(),
- new Tuple2<>(columnFamilyHandle,
stateMetaInfo));
+ new Tuple2<>(columnFamilyHandle, null));
}
// use the restore sst files as the base for succeeding
checkpoints
@@ -1275,22 +1259,15 @@ private void restoreInstanceDirectoryFromPath(Path
source) throws IOException {
// that the new serializer for states could be
compatible, and therefore the restore can continue
// without old serializers required to be
present.
KeyedBackendSerializationProxy<T>
serializationProxy =
- new
KeyedBackendSerializationProxy<>(stateBackend.userCodeClassLoader, false);
+ new
KeyedBackendSerializationProxy<>(stateBackend.userCodeClassLoader);
DataInputView in = new
DataInputViewStreamWrapper(inputStream);
serializationProxy.read(in);
// check for key serializer compatibility; this
also reconfigures the
// key serializer to be compatible, if it is
required and is possible
- if
(CompatibilityUtil.resolveCompatibilityResult(
-
serializationProxy.restoreKeySerializer(),
- UnloadableDummyTypeSerializer.class,
-
serializationProxy.getKeySerializerConfigSnapshot(),
- stateBackend.keySerializer)
- .isRequiresMigration()) {
-
- // TODO replace with state migration;
note that key hash codes need to remain the same after migration
- throw new StateMigrationException("The
new key serializer is not compatible to read previous keys. " +
- "Aborting now since state
migration is currently not available");
+ if
(!serializationProxy.getKeySerializerConfigSnapshot()
+
.resolveSchemaCompatibility(stateBackend.keySerializer).isCompatibleAsIs()) {
+ throw new StateMigrationException("The
new key serializer must be compatible.");
}
return
serializationProxy.getStateMetaInfoSnapshots();
@@ -1622,13 +1599,15 @@ public static RocksIteratorWrapper getRocksIterator(
TypeSerializer<?> metaInfoTypeSerializer =
restoredMetaInfoSnapshot.restoreTypeSerializer(serializerKey);
if (metaInfoTypeSerializer !=
byteOrderedElementSerializer) {
- CompatibilityResult<T> compatibilityResult =
CompatibilityUtil.resolveCompatibilityResult(
- metaInfoTypeSerializer,
- null,
-
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(serializerKey),
- byteOrderedElementSerializer);
+ @SuppressWarnings("unchecked")
+ TypeSerializerSnapshot<T> serializerSnapshot =
Preconditions.checkNotNull(
+ (TypeSerializerSnapshot<T>)
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(serializerKey));
+
+ TypeSerializerSchemaCompatibility<T, ?>
compatibilityResult =
+
serializerSnapshot.resolveSchemaCompatibility(byteOrderedElementSerializer);
- if (compatibilityResult.isRequiresMigration()) {
+ // TODO implement proper migration for priority
queue state
+ if (compatibilityResult.isIncompatible()) {
throw new
FlinkRuntimeException(StateMigrationException.notSupported());
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java
index 872a58d2304..f813039ae35 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java
@@ -290,12 +290,12 @@ public int getCurrentVersion() {
}
@Override
- public void write(DataOutputView out) throws IOException {
+ public void writeSnapshot(DataOutputView out) throws
IOException {
out.writeUTF(configPayload);
}
@Override
- public void read(int readVersion, DataInputView in, ClassLoader
userCodeClassLoader) throws IOException {
+ public void readSnapshot(int readVersion, DataInputView in,
ClassLoader userCodeClassLoader) throws IOException {
if (readVersion != 1) {
throw new IllegalStateException("Can not
recognize read version: " + readVersion);
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services