This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1c36d22a7c51e080dcf6766c71677741a1c68194 Author: Igal Shilman <[email protected]> AuthorDate: Tue Feb 26 20:59:36 2019 +0100 [FLINK-11753] [tests] Refactor SchemaCompatibilityTestingSerializer This closes #7845. --- .../typeutils/CompositeTypeSerializerUtilTest.java | 56 ++-- .../PojoSerializerSnapshotMigrationTest.java | 3 +- .../runtime/PojoSerializerSnapshotTest.java | 21 +- .../SchemaCompatibilityTestingSerializer.java | 327 +++++++++------------ 4 files changed, 189 insertions(+), 218 deletions(-) diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUtilTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUtilTest.java index 37edcd7..206fc13 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUtilTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUtilTest.java @@ -19,8 +19,10 @@ package org.apache.flink.api.common.typeutils; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil.IntermediateCompatibilityResult; -import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.testutils.migration.SchemaCompatibilityTestingSerializer; +import org.apache.flink.testutils.migration.SchemaCompatibilityTestingSerializer.SchemaCompatibilityTestingSnapshot; +import org.apache.flink.api.common.typeutils.base.IntSerializer; + import org.junit.Test; import static org.junit.Assert.assertArrayEquals; @@ -38,13 +40,13 @@ public class CompositeTypeSerializerUtilTest { @Test public void testCompatibleAsIsIntermediateCompatibilityResult() { final TypeSerializerSnapshot<?>[] testSerializerSnapshots = new TypeSerializerSnapshot<?>[] { - new SchemaCompatibilityTestingSerializer.InitialSerializer().snapshotConfiguration(), - new SchemaCompatibilityTestingSerializer.InitialSerializer().snapshotConfiguration(), + SchemaCompatibilityTestingSnapshot.thatIsCompatibleWithNextSerializer("first serializer"), + SchemaCompatibilityTestingSnapshot.thatIsCompatibleWithNextSerializer("second serializer"), }; final TypeSerializer<?>[] testNewSerializers = new TypeSerializer<?>[] { - new SchemaCompatibilityTestingSerializer.InitialSerializer(), - new SchemaCompatibilityTestingSerializer.InitialSerializer(), + new SchemaCompatibilityTestingSerializer("first serializer"), + new SchemaCompatibilityTestingSerializer("second serializer"), }; IntermediateCompatibilityResult<?> intermediateCompatibilityResult = @@ -58,39 +60,39 @@ public class CompositeTypeSerializerUtilTest { @Test public void testCompatibleWithReconfiguredSerializerIntermediateCompatibilityResult() { final TypeSerializerSnapshot<?>[] testSerializerSnapshots = new TypeSerializerSnapshot<?>[] { - new SchemaCompatibilityTestingSerializer.InitialSerializer().snapshotConfiguration(), - new SchemaCompatibilityTestingSerializer.InitialSerializer().snapshotConfiguration(), + SchemaCompatibilityTestingSnapshot.thatIsCompatibleWithNextSerializer("a"), + SchemaCompatibilityTestingSnapshot.thatIsCompatibleWithNextSerializerAfterReconfiguration("b"), }; final TypeSerializer<?>[] testNewSerializers = new TypeSerializer<?>[] { - new SchemaCompatibilityTestingSerializer.InitialSerializer(), - new SchemaCompatibilityTestingSerializer.ReconfigurationRequiringSerializer<>(), + new SchemaCompatibilityTestingSerializer("a"), + new SchemaCompatibilityTestingSerializer("b"), }; IntermediateCompatibilityResult<?> intermediateCompatibilityResult = CompositeTypeSerializerUtil.constructIntermediateCompatibilityResult(testNewSerializers, testSerializerSnapshots); - assertTrue(intermediateCompatibilityResult.isCompatibleWithReconfiguredSerializer()); - final TypeSerializer<?>[] expectedReconfiguredNestedSerializers = new TypeSerializer<?>[] { - new SchemaCompatibilityTestingSerializer.InitialSerializer(), - new SchemaCompatibilityTestingSerializer.InitialSerializer(), + new SchemaCompatibilityTestingSerializer("a"), + new SchemaCompatibilityTestingSerializer("b"), }; + + assertTrue(intermediateCompatibilityResult.isCompatibleWithReconfiguredSerializer()); assertArrayEquals(expectedReconfiguredNestedSerializers, intermediateCompatibilityResult.getNestedSerializers()); } @Test public void testCompatibleAfterMigrationIntermediateCompatibilityResult() { final TypeSerializerSnapshot<?>[] testSerializerSnapshots = new TypeSerializerSnapshot<?>[] { - new SchemaCompatibilityTestingSerializer.InitialSerializer().snapshotConfiguration(), - new SchemaCompatibilityTestingSerializer.InitialSerializer().snapshotConfiguration(), - new SchemaCompatibilityTestingSerializer.InitialSerializer().snapshotConfiguration(), + SchemaCompatibilityTestingSnapshot.thatIsCompatibleWithNextSerializerAfterReconfiguration("a"), + SchemaCompatibilityTestingSnapshot.thatIsCompatibleWithNextSerializerAfterMigration("b"), + SchemaCompatibilityTestingSnapshot.thatIsCompatibleWithNextSerializer("c"), }; final TypeSerializer<?>[] testNewSerializers = new TypeSerializer<?>[] { - new SchemaCompatibilityTestingSerializer.ReconfigurationRequiringSerializer<>(), - new SchemaCompatibilityTestingSerializer.UpgradedSchemaSerializer<>(), - new SchemaCompatibilityTestingSerializer.InitialSerializer() + new SchemaCompatibilityTestingSerializer("a"), + new SchemaCompatibilityTestingSerializer("b"), + new SchemaCompatibilityTestingSerializer("c") }; IntermediateCompatibilityResult<?> intermediateCompatibilityResult = @@ -103,17 +105,17 @@ public class CompositeTypeSerializerUtilTest { @Test public void testIncompatibleIntermediateCompatibilityResult() { final TypeSerializerSnapshot<?>[] testSerializerSnapshots = new TypeSerializerSnapshot<?>[] { - new SchemaCompatibilityTestingSerializer.InitialSerializer().snapshotConfiguration(), - new SchemaCompatibilityTestingSerializer.InitialSerializer().snapshotConfiguration(), - new SchemaCompatibilityTestingSerializer.InitialSerializer().snapshotConfiguration(), - new SchemaCompatibilityTestingSerializer.InitialSerializer().snapshotConfiguration(), + SchemaCompatibilityTestingSnapshot.thatIsCompatibleWithNextSerializer(), + SchemaCompatibilityTestingSnapshot.thatIsIncompatibleWithTheNextSerializer(), + SchemaCompatibilityTestingSnapshot.thatIsCompatibleWithNextSerializerAfterReconfiguration(), + SchemaCompatibilityTestingSnapshot.thatIsCompatibleWithNextSerializerAfterMigration(), }; final TypeSerializer<?>[] testNewSerializers = new TypeSerializer<?>[] { - new SchemaCompatibilityTestingSerializer.InitialSerializer(), - new SchemaCompatibilityTestingSerializer.IncompatibleSerializer<>(), - new SchemaCompatibilityTestingSerializer.ReconfigurationRequiringSerializer<>(), - new SchemaCompatibilityTestingSerializer.UpgradedSchemaSerializer() + new SchemaCompatibilityTestingSerializer(), + new SchemaCompatibilityTestingSerializer(), + new SchemaCompatibilityTestingSerializer(), + new SchemaCompatibilityTestingSerializer() }; IntermediateCompatibilityResult<?> intermediateCompatibilityResult = diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshotMigrationTest.java index 5cbee5a..6dfcb64 100644 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshotMigrationTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshotMigrationTest.java @@ -30,6 +30,7 @@ import org.junit.runners.Parameterized; import java.util.Collection; +import static org.apache.flink.api.common.typeutils.TypeSerializerMatchers.hasSameCompatibilityAs; import static org.junit.Assert.assertTrue; /** @@ -104,7 +105,7 @@ public class PojoSerializerSnapshotMigrationTest extends TypeSerializerSnapshotM PojoSerializer.class, PojoSerializerSnapshot.class, PojoSerializerSnapshotMigrationTest::testPojoWithNewAndRemovedFieldsSerializerSupplier, - hasSameCompatibilityType(TypeSerializerSchemaCompatibility.compatibleAfterMigration())); + hasSameCompatibilityAs(TypeSerializerSchemaCompatibility.compatibleAfterMigration())); return testSpecifications.get(); } diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshotTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshotTest.java index 91a3414..7b8b9f2 100644 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshotTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshotTest.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeutils.base.DoubleSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.testutils.migration.SchemaCompatibilityTestingSerializer; +import org.apache.flink.testutils.migration.SchemaCompatibilityTestingSerializer.SchemaCompatibilityTestingSnapshot; import org.junit.Test; @@ -39,6 +40,7 @@ import java.util.List; import java.util.Map; import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; /** @@ -120,7 +122,7 @@ public class PojoSerializerSnapshotTest { )); final TypeSerializer<TestPojo> restoredSerializer = testSnapshot.restoreSerializer(); - assertTrue(restoredSerializer.getClass() == PojoSerializer.class); + assertSame(restoredSerializer.getClass(), PojoSerializer.class); final PojoSerializer<TestPojo> restoredPojoSerializer = (PojoSerializer<TestPojo>) restoredSerializer; final Field[] restoredFields = restoredPojoSerializer.getFields(); @@ -254,13 +256,13 @@ public class PojoSerializerSnapshotTest { ID_FIELD, mockFieldSerializerSnapshot( NAME_FIELD, - new SchemaCompatibilityTestingSerializer.InitialSerializer<>().snapshotConfiguration()), + SchemaCompatibilityTestingSnapshot.thatIsIncompatibleWithTheNextSerializer()), HEIGHT_FIELD )); final PojoSerializer<TestPojo> newPojoSerializer = buildTestNewPojoSerializer(Arrays.asList( ID_FIELD, - mockFieldSerializer(NAME_FIELD, new SchemaCompatibilityTestingSerializer.IncompatibleSerializer<>()), + mockFieldSerializer(NAME_FIELD, new SchemaCompatibilityTestingSerializer()), HEIGHT_FIELD )); @@ -277,13 +279,12 @@ public class PojoSerializerSnapshotTest { NAME_FIELD, mockFieldSerializerSnapshot( HEIGHT_FIELD, - new SchemaCompatibilityTestingSerializer.InitialSerializer<>().snapshotConfiguration()) - )); + SchemaCompatibilityTestingSnapshot.thatIsCompatibleWithNextSerializerAfterMigration()))); final PojoSerializer<TestPojo> newPojoSerializer = buildTestNewPojoSerializer(Arrays.asList( ID_FIELD, NAME_FIELD, - mockFieldSerializer(HEIGHT_FIELD, new SchemaCompatibilityTestingSerializer.UpgradedSchemaSerializer<>()) + mockFieldSerializer(HEIGHT_FIELD, new SchemaCompatibilityTestingSerializer()) )); final TypeSerializerSchemaCompatibility<TestPojo> resultCompatibility = @@ -297,13 +298,13 @@ public class PojoSerializerSnapshotTest { final PojoSerializerSnapshot<TestPojo> testSnapshot = buildTestSnapshot(Arrays.asList( mockFieldSerializerSnapshot( ID_FIELD, - new SchemaCompatibilityTestingSerializer.InitialSerializer<>().snapshotConfiguration()), + SchemaCompatibilityTestingSnapshot.thatIsCompatibleWithNextSerializerAfterReconfiguration()), NAME_FIELD, HEIGHT_FIELD )); final PojoSerializer<TestPojo> newPojoSerializer = buildTestNewPojoSerializer(Arrays.asList( - mockFieldSerializer(ID_FIELD, new SchemaCompatibilityTestingSerializer.ReconfigurationRequiringSerializer<>()), + mockFieldSerializer(ID_FIELD, new SchemaCompatibilityTestingSerializer()), NAME_FIELD, HEIGHT_FIELD )); @@ -314,13 +315,13 @@ public class PojoSerializerSnapshotTest { assertTrue(resultCompatibility.isCompatibleWithReconfiguredSerializer()); final TypeSerializer<TestPojo> reconfiguredSerializer = resultCompatibility.getReconfiguredSerializer(); - assertTrue(reconfiguredSerializer.getClass() == PojoSerializer.class); + assertSame(reconfiguredSerializer.getClass(), PojoSerializer.class); final PojoSerializer<TestPojo> reconfiguredPojoSerializer = (PojoSerializer<TestPojo>) reconfiguredSerializer; final TypeSerializer<?>[] reconfiguredFieldSerializers = reconfiguredPojoSerializer.getFieldSerializers(); assertArrayEquals( new TypeSerializer[] { - new SchemaCompatibilityTestingSerializer.InitialSerializer(), + new SchemaCompatibilityTestingSerializer(), StringSerializer.INSTANCE, DoubleSerializer.INSTANCE }, reconfiguredFieldSerializers); diff --git a/flink-core/src/test/java/org/apache/flink/testutils/migration/SchemaCompatibilityTestingSerializer.java b/flink-core/src/test/java/org/apache/flink/testutils/migration/SchemaCompatibilityTestingSerializer.java index 253eed7..5beb621 100644 --- a/flink-core/src/test/java/org/apache/flink/testutils/migration/SchemaCompatibilityTestingSerializer.java +++ b/flink-core/src/test/java/org/apache/flink/testutils/migration/SchemaCompatibilityTestingSerializer.java @@ -24,273 +24,240 @@ import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; -import java.io.IOException; +import javax.annotation.Nullable; + +import java.util.Objects; +import java.util.function.Function; /** * The {@link SchemaCompatibilityTestingSerializer} is a mock serializer that can be used * for schema compatibility and serializer upgrade related tests. * - * <p>To use this, tests should always first instantiate an {@link InitialSerializer} serializer - * instance and then take a snapshot of it. Next, depending on what the new serializer is provided - * to the taken snapshot, tests can expect the following compatibility results: + * <p>To test serializers compatibility we can start by either obtaining a {@link TypeSerializerSnapshot} and restoring + * a {@link TypeSerializer} or the other way around, starting with a serializer and calling {@link TypeSerializer#snapshotConfiguration()} + * to obtain a snapshot class. + * + * <p>To start from a snapshot, the class {@link SchemaCompatibilityTestingSnapshot} can be configured to return a predefined + * {@link TypeSerializerSchemaCompatibility} result when + * {@link TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer)} would be called, the following static factory + * methods return a pre-configured snapshot class: * <ul> - * <li>{@link InitialSerializer} as the new serializer: the compatibility result will be - * {@link TypeSerializerSchemaCompatibility#compatibleAsIs()}.</li> - * <li>{@link UpgradedSchemaSerializer} as the new serializer: the compatibility result will be - * {@link TypeSerializerSchemaCompatibility#compatibleAfterMigration()}.</li> - * <li>{@link ReconfigurationRequiringSerializer} as the new serializer: the compatibility result - * will be {@link TypeSerializerSchemaCompatibility#compatibleWithReconfiguredSerializer(TypeSerializer)}, - * with a new instance of the {@link InitialSerializer} as the wrapped reconfigured serializer.</li> - * <li>{@link IncompatibleSerializer} as the new serializer: the compatibility result will be - * {@link TypeSerializerSchemaCompatibility#incompatible()}.</li> + * <li>{@code thatIsCompatibleWithNextSerializer}. + * <li>{@code thatIsCompatibleWithNextSerializerAfterReconfiguration}</li> + * <li>{@code thatIsCompatibleWithNextSerializerAfterMigration}</li> + * <li>{@code thatIsIncompatibleWithTheNextSerializer}</li> * </ul> + * + * <p>Here is a simple test example.<pre>{@code + * @Test + * public void example() { + * TypeSerializerSnapshot<?> snapshot = SchemaCompatibilityTestingSnapshot.thatIsCompatibleWithNextSerializer(); + * + * TypeSerializerSchemaCompatibility<?> result = snapshot.resolveSchemaCompatibility(new SchemaCompatibilityTestingSerializer()); + * + * assertTrue(result.compatibleAsIs()); + * } + * }</pre> + * + * <p>To start from a serializer, simply create a new instance of {@code SchemaCompatibilityTestingSerializer} and call + * {@link SchemaCompatibilityTestingSerializer#snapshotConfiguration()} to obtain a {@link SchemaCompatibilityTestingSnapshot}. + * To control the behaviour of the returned snapshot it is possible to pass a function from a {@code newSerializer} to a + * {@link TypeSerializerSchemaCompatibility}. + * + * <p>It is also possible to pass in a {@code String} identifier when constructing a snapshot or a serializer to tie a + * specific snapshot instance to a specific serializer instance, this might be useful when testing composite serializers. */ -public abstract class SchemaCompatibilityTestingSerializer<T> extends TypeSerializer<T> { +@SuppressWarnings({"WeakerAccess", "serial"}) +public final class SchemaCompatibilityTestingSerializer extends TypeSerializer<Integer> { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2588814752302505240L; - // ------------------------------------------------------------------------------------------------ - // Irrelevant serializer methods - // ------------------------------------------------------------------------------------------------ + private final Function<TypeSerializer<Integer>, TypeSerializerSchemaCompatibility<Integer>> resolver; - @Override - public TypeSerializer<T> duplicate() { - throw new UnsupportedOperationException( - "this is a SchemaCompatibilityTestingSerializer; should have only been used for snapshotting / compatibility test purposes."); + @Nullable + private final String tokenForEqualityChecks; + + public SchemaCompatibilityTestingSerializer() { + this(null, ALWAYS_COMPATIBLE); } - @Override - public void serialize(T record, DataOutputView target) throws IOException { - throw new UnsupportedOperationException( - "this is a SchemaCompatibilityTestingSerializer; should have only been used for snapshotting / compatibility test purposes."); + public SchemaCompatibilityTestingSerializer(String tokenForEqualityChecks) { + this(tokenForEqualityChecks, ALWAYS_COMPATIBLE); + } + public SchemaCompatibilityTestingSerializer( + @Nullable String tokenForEqualityChecks, + Function<TypeSerializer<Integer>, TypeSerializerSchemaCompatibility<Integer>> resolver) { + this.resolver = resolver; + this.tokenForEqualityChecks = tokenForEqualityChecks; } @Override - public T deserialize(T reuse, DataInputView source) throws IOException { - throw new UnsupportedOperationException( - "this is a SchemaCompatibilityTestingSerializer; should have only been used for snapshotting / compatibility test purposes."); - + public boolean equals(Object obj) { + return (obj instanceof SchemaCompatibilityTestingSerializer) && + Objects.equals(tokenForEqualityChecks, ((SchemaCompatibilityTestingSerializer) obj).tokenForEqualityChecks); } @Override - public T deserialize(DataInputView source) throws IOException { - throw new UnsupportedOperationException( - "this is a SchemaCompatibilityTestingSerializer; should have only been used for snapshotting / compatibility test purposes."); - + public int hashCode() { + return Objects.hash(getClass(), tokenForEqualityChecks); } @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - throw new UnsupportedOperationException( - "this is a SchemaCompatibilityTestingSerializer; should have only been used for snapshotting / compatibility test purposes."); - + public String toString() { + return "SchemaCompatibilityTestingSerializer{" + + "tokenForEqualityChecks='" + tokenForEqualityChecks + '\'' + + '}'; } @Override - public T copy(T from) { - throw new UnsupportedOperationException( - "this is a SchemaCompatibilityTestingSerializer; should have only been used for snapshotting / compatibility test purposes."); - + public TypeSerializerSnapshot<Integer> snapshotConfiguration() { + return new SchemaCompatibilityTestingSnapshot(tokenForEqualityChecks, resolver); } - @Override - public T copy(T from, T reuse) { - throw new UnsupportedOperationException( - "this is a SchemaCompatibilityTestingSerializer; should have only been used for snapshotting / compatibility test purposes."); + // ----------------------------------------------------------------------------------------------------------- + // Serialization related methods are not supported + // ----------------------------------------------------------------------------------------------------------- + @Override + public boolean isImmutableType() { + throw new UnsupportedOperationException(); } @Override - public T createInstance() { - throw new UnsupportedOperationException( - "this is a SchemaCompatibilityTestingSerializer; should have only been used for snapshotting / compatibility test purposes."); + public TypeSerializer<Integer> duplicate() { + throw new UnsupportedOperationException(); + } + @Override + public Integer createInstance() { + throw new UnsupportedOperationException(); } @Override - public boolean isImmutableType() { - throw new UnsupportedOperationException( - "this is a SchemaCompatibilityTestingSerializer; should have only been used for snapshotting / compatibility test purposes."); + public Integer copy(Integer from) { + throw new UnsupportedOperationException(); + } + @Override + public Integer copy(Integer from, Integer reuse) { + throw new UnsupportedOperationException(); } @Override public int getLength() { - throw new UnsupportedOperationException( - "this is a SchemaCompatibilityTestingSerializer; should have only been used for snapshotting / compatibility test purposes."); - + throw new UnsupportedOperationException(); } - // ------------------------------------------------------------------------------------------------ - // Test serializer variants - // ------------------------------------------------------------------------------------------------ - - public static class InitialSerializer<T> extends SchemaCompatibilityTestingSerializer<T> { - - private static final long serialVersionUID = 1L; + @Override + public void serialize(Integer record, DataOutputView target) { + throw new UnsupportedOperationException(); + } - @Override - public TypeSerializerSnapshot<T> snapshotConfiguration() { - return new InitialSerializerSnapshot(); - } + @Override + public Integer deserialize(DataInputView source) { + throw new UnsupportedOperationException(); + } - @Override - public boolean equals(Object obj) { - return obj != null && obj.getClass() == InitialSerializer.class; - } + @Override + public Integer deserialize(Integer reuse, DataInputView source) { + throw new UnsupportedOperationException(); + } - @Override - public int hashCode() { - return getClass().hashCode(); - } + @Override + public void copy(DataInputView source, DataOutputView target) { + throw new UnsupportedOperationException(); } - public static class UpgradedSchemaSerializer<T> extends SchemaCompatibilityTestingSerializer<T> { + // ----------------------------------------------------------------------------------------------------------- + // Utils + // ----------------------------------------------------------------------------------------------------------- - private static final long serialVersionUID = 1L; + private static final Function<TypeSerializer<Integer>, TypeSerializerSchemaCompatibility<Integer>> ALWAYS_COMPATIBLE = + unused -> TypeSerializerSchemaCompatibility.compatibleAsIs(); - @Override - public TypeSerializerSnapshot<T> snapshotConfiguration() { - return new UpgradedSchemaSerializerSnapshot(); - } + // ----------------------------------------------------------------------------------------------------------- + // Snapshot class + // ----------------------------------------------------------------------------------------------------------- - @Override - public boolean equals(Object obj) { - return obj != null && obj.getClass() == UpgradedSchemaSerializer.class; - } + /** + * A configurable {@link TypeSerializerSnapshot} for this serializer. + */ + @SuppressWarnings("WeakerAccess") + public static final class SchemaCompatibilityTestingSnapshot implements TypeSerializerSnapshot<Integer> { - @Override - public int hashCode() { - return getClass().hashCode(); + public static SchemaCompatibilityTestingSnapshot thatIsCompatibleWithNextSerializer() { + return thatIsCompatibleWithNextSerializer(null); } - } - - public static class ReconfigurationRequiringSerializer<T> extends SchemaCompatibilityTestingSerializer<T> { - - private static final long serialVersionUID = 1L; - @Override - public TypeSerializerSnapshot<T> snapshotConfiguration() { - throw new UnsupportedOperationException( - "this is a ReconfigurationRequiringSerializer; should not have been snapshotted."); + public static SchemaCompatibilityTestingSnapshot thatIsCompatibleWithNextSerializer(String tokenForEqualityChecks) { + return new SchemaCompatibilityTestingSnapshot(tokenForEqualityChecks, unused -> TypeSerializerSchemaCompatibility.compatibleAsIs()); } - @Override - public boolean equals(Object obj) { - return obj != null && obj.getClass() == ReconfigurationRequiringSerializer.class; + public static SchemaCompatibilityTestingSnapshot thatIsCompatibleWithNextSerializerAfterReconfiguration() { + return thatIsCompatibleWithNextSerializerAfterReconfiguration(null); } - @Override - public int hashCode() { - return getClass().hashCode(); + public static SchemaCompatibilityTestingSnapshot thatIsCompatibleWithNextSerializerAfterReconfiguration(String tokenForEqualityChecks) { + SchemaCompatibilityTestingSerializer reconfiguredSerializer = new SchemaCompatibilityTestingSerializer(tokenForEqualityChecks, ALWAYS_COMPATIBLE); + return new SchemaCompatibilityTestingSnapshot(tokenForEqualityChecks, unused -> TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(reconfiguredSerializer)); } - } - - public static class IncompatibleSerializer<T> extends SchemaCompatibilityTestingSerializer<T> { - - private static final long serialVersionUID = 1L; - @Override - public TypeSerializerSnapshot<T> snapshotConfiguration() { - throw new UnsupportedOperationException( - "this is a IncompatibleSerializer; should not have been snapshotted."); + public static SchemaCompatibilityTestingSnapshot thatIsCompatibleWithNextSerializerAfterMigration() { + return thatIsCompatibleWithNextSerializerAfterMigration(null); } - @Override - public boolean equals(Object obj) { - return obj != null && obj.getClass() == IncompatibleSerializer.class; + public static SchemaCompatibilityTestingSnapshot thatIsCompatibleWithNextSerializerAfterMigration(String tokenForEqualityChecks) { + return new SchemaCompatibilityTestingSnapshot(tokenForEqualityChecks, unused -> TypeSerializerSchemaCompatibility.compatibleAfterMigration()); } - @Override - public int hashCode() { - return getClass().hashCode(); + public static SchemaCompatibilityTestingSnapshot thatIsIncompatibleWithTheNextSerializer() { + return thatIsIncompatibleWithTheNextSerializer(null); } - } - - // ------------------------------------------------------------------------------------------------ - // Test serializer snapshots - // ------------------------------------------------------------------------------------------------ - public class InitialSerializerSnapshot implements TypeSerializerSnapshot<T> { - - @Override - public int getCurrentVersion() { - return 1; + public static SchemaCompatibilityTestingSnapshot thatIsIncompatibleWithTheNextSerializer(String tokenForEqualityChecks) { + return new SchemaCompatibilityTestingSnapshot(tokenForEqualityChecks, unused -> TypeSerializerSchemaCompatibility.incompatible()); } - @Override - public void writeSnapshot(DataOutputView out) throws IOException {} - - @Override - public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {} + @Nullable + private final String tokenForEqualityChecks; + private final Function<TypeSerializer<Integer>, TypeSerializerSchemaCompatibility<Integer>> resolver; - @Override - public TypeSerializer<T> restoreSerializer() { - return new SchemaCompatibilityTestingSerializer.InitialSerializer<>(); + SchemaCompatibilityTestingSnapshot(@Nullable String tokenForEqualityChecks, Function<TypeSerializer<Integer>, TypeSerializerSchemaCompatibility<Integer>> resolver) { + this.tokenForEqualityChecks = tokenForEqualityChecks; + this.resolver = resolver; } @Override - public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer) { - if (newSerializer instanceof SchemaCompatibilityTestingSerializer.InitialSerializer) { - return TypeSerializerSchemaCompatibility.compatibleAsIs(); - } - - if (newSerializer instanceof SchemaCompatibilityTestingSerializer.UpgradedSchemaSerializer) { - return TypeSerializerSchemaCompatibility.compatibleAfterMigration(); - } - - if (newSerializer instanceof SchemaCompatibilityTestingSerializer.ReconfigurationRequiringSerializer) { - return TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(new SchemaCompatibilityTestingSerializer.InitialSerializer<T>()); + public TypeSerializerSchemaCompatibility<Integer> resolveSchemaCompatibility(TypeSerializer<Integer> newSerializer) { + if (!(newSerializer instanceof SchemaCompatibilityTestingSerializer)) { + return TypeSerializerSchemaCompatibility.incompatible(); } - - if (newSerializer instanceof SchemaCompatibilityTestingSerializer.IncompatibleSerializer) { + SchemaCompatibilityTestingSerializer schemaCompatibilityTestingSerializer = (SchemaCompatibilityTestingSerializer) newSerializer; + if (!(Objects.equals(schemaCompatibilityTestingSerializer.tokenForEqualityChecks, tokenForEqualityChecks))) { return TypeSerializerSchemaCompatibility.incompatible(); } - - // reaches here if newSerializer isn't a SchemaCompatibilityTestingSerializer - return TypeSerializerSchemaCompatibility.incompatible(); + return resolver.apply(newSerializer); } - } - - public class UpgradedSchemaSerializerSnapshot implements TypeSerializerSnapshot<T> { @Override public int getCurrentVersion() { - return 1; + throw new UnsupportedOperationException(); } @Override - public void writeSnapshot(DataOutputView out) throws IOException {} - - @Override - public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {} + public void writeSnapshot(DataOutputView out) { + throw new UnsupportedOperationException(); + } @Override - public TypeSerializer<T> restoreSerializer() { - return new SchemaCompatibilityTestingSerializer.UpgradedSchemaSerializer<>(); + public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) { + throw new UnsupportedOperationException(); } @Override - public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer) { - if (newSerializer instanceof SchemaCompatibilityTestingSerializer.InitialSerializer) { - return TypeSerializerSchemaCompatibility.incompatible(); - } - - if (newSerializer instanceof SchemaCompatibilityTestingSerializer.UpgradedSchemaSerializer) { - return TypeSerializerSchemaCompatibility.compatibleAsIs(); - } - - if (newSerializer instanceof SchemaCompatibilityTestingSerializer.ReconfigurationRequiringSerializer) { - return TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(new SchemaCompatibilityTestingSerializer.UpgradedSchemaSerializer<T>()); - } - - if (newSerializer instanceof SchemaCompatibilityTestingSerializer.IncompatibleSerializer) { - return TypeSerializerSchemaCompatibility.incompatible(); - } - - // reaches here if newSerializer isn't a SchemaCompatibilityTestingSerializer - return TypeSerializerSchemaCompatibility.incompatible(); + public TypeSerializer<Integer> restoreSerializer() { + return new SchemaCompatibilityTestingSerializer(tokenForEqualityChecks, resolver); } } }
