This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6ac771a09c107444e0ddc205aeaaa8dafd7998c6
Author: Igal Shilman <[email protected]>
AuthorDate: Tue Feb 26 20:59:36 2019 +0100

    [FLINK-11753] [tests] Refactor SchemaCompatibilityTestingSerializer
    
    This closes #7845.
---
 .../typeutils/CompositeTypeSerializerUtilTest.java |  56 ++--
 .../PojoSerializerSnapshotMigrationTest.java       |   3 +-
 .../runtime/PojoSerializerSnapshotTest.java        |  21 +-
 .../SchemaCompatibilityTestingSerializer.java      | 327 +++++++++------------
 4 files changed, 189 insertions(+), 218 deletions(-)

diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUtilTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUtilTest.java
index 37edcd7..206fc13 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUtilTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUtilTest.java
@@ -19,8 +19,10 @@
 package org.apache.flink.api.common.typeutils;
 
 import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil.IntermediateCompatibilityResult;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import 
org.apache.flink.testutils.migration.SchemaCompatibilityTestingSerializer;
+import 
org.apache.flink.testutils.migration.SchemaCompatibilityTestingSerializer.SchemaCompatibilityTestingSnapshot;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
@@ -38,13 +40,13 @@ public class CompositeTypeSerializerUtilTest {
        @Test
        public void testCompatibleAsIsIntermediateCompatibilityResult() {
                final TypeSerializerSnapshot<?>[] testSerializerSnapshots = new 
TypeSerializerSnapshot<?>[] {
-                       new 
SchemaCompatibilityTestingSerializer.InitialSerializer().snapshotConfiguration(),
-                       new 
SchemaCompatibilityTestingSerializer.InitialSerializer().snapshotConfiguration(),
+                       
SchemaCompatibilityTestingSnapshot.thatIsCompatibleWithNextSerializer("first 
serializer"),
+                       
SchemaCompatibilityTestingSnapshot.thatIsCompatibleWithNextSerializer("second 
serializer"),
                };
 
                final TypeSerializer<?>[] testNewSerializers = new 
TypeSerializer<?>[] {
-                       new 
SchemaCompatibilityTestingSerializer.InitialSerializer(),
-                       new 
SchemaCompatibilityTestingSerializer.InitialSerializer(),
+                       new SchemaCompatibilityTestingSerializer("first 
serializer"),
+                       new SchemaCompatibilityTestingSerializer("second 
serializer"),
                };
 
                IntermediateCompatibilityResult<?> 
intermediateCompatibilityResult =
@@ -58,39 +60,39 @@ public class CompositeTypeSerializerUtilTest {
        @Test
        public void 
testCompatibleWithReconfiguredSerializerIntermediateCompatibilityResult() {
                final TypeSerializerSnapshot<?>[] testSerializerSnapshots = new 
TypeSerializerSnapshot<?>[] {
-                       new 
SchemaCompatibilityTestingSerializer.InitialSerializer().snapshotConfiguration(),
-                       new 
SchemaCompatibilityTestingSerializer.InitialSerializer().snapshotConfiguration(),
+                       
SchemaCompatibilityTestingSnapshot.thatIsCompatibleWithNextSerializer("a"),
+                       
SchemaCompatibilityTestingSnapshot.thatIsCompatibleWithNextSerializerAfterReconfiguration("b"),
                };
 
                final TypeSerializer<?>[] testNewSerializers = new 
TypeSerializer<?>[] {
-                       new 
SchemaCompatibilityTestingSerializer.InitialSerializer(),
-                       new 
SchemaCompatibilityTestingSerializer.ReconfigurationRequiringSerializer<>(),
+                       new SchemaCompatibilityTestingSerializer("a"),
+                       new SchemaCompatibilityTestingSerializer("b"),
                };
 
                IntermediateCompatibilityResult<?> 
intermediateCompatibilityResult =
                        
CompositeTypeSerializerUtil.constructIntermediateCompatibilityResult(testNewSerializers,
 testSerializerSnapshots);
 
-               
assertTrue(intermediateCompatibilityResult.isCompatibleWithReconfiguredSerializer());
-
                final TypeSerializer<?>[] expectedReconfiguredNestedSerializers 
= new TypeSerializer<?>[] {
-                       new 
SchemaCompatibilityTestingSerializer.InitialSerializer(),
-                       new 
SchemaCompatibilityTestingSerializer.InitialSerializer(),
+                       new SchemaCompatibilityTestingSerializer("a"),
+                       new SchemaCompatibilityTestingSerializer("b"),
                };
+
+               
assertTrue(intermediateCompatibilityResult.isCompatibleWithReconfiguredSerializer());
                assertArrayEquals(expectedReconfiguredNestedSerializers, 
intermediateCompatibilityResult.getNestedSerializers());
        }
 
        @Test
        public void 
testCompatibleAfterMigrationIntermediateCompatibilityResult() {
                final TypeSerializerSnapshot<?>[] testSerializerSnapshots = new 
TypeSerializerSnapshot<?>[] {
-                       new 
SchemaCompatibilityTestingSerializer.InitialSerializer().snapshotConfiguration(),
-                       new 
SchemaCompatibilityTestingSerializer.InitialSerializer().snapshotConfiguration(),
-                       new 
SchemaCompatibilityTestingSerializer.InitialSerializer().snapshotConfiguration(),
+                       
SchemaCompatibilityTestingSnapshot.thatIsCompatibleWithNextSerializerAfterReconfiguration("a"),
+                       
SchemaCompatibilityTestingSnapshot.thatIsCompatibleWithNextSerializerAfterMigration("b"),
+                       
SchemaCompatibilityTestingSnapshot.thatIsCompatibleWithNextSerializer("c"),
                };
 
                final TypeSerializer<?>[] testNewSerializers = new 
TypeSerializer<?>[] {
-                       new 
SchemaCompatibilityTestingSerializer.ReconfigurationRequiringSerializer<>(),
-                       new 
SchemaCompatibilityTestingSerializer.UpgradedSchemaSerializer<>(),
-                       new 
SchemaCompatibilityTestingSerializer.InitialSerializer()
+                       new SchemaCompatibilityTestingSerializer("a"),
+                       new SchemaCompatibilityTestingSerializer("b"),
+                       new SchemaCompatibilityTestingSerializer("c")
                };
 
                IntermediateCompatibilityResult<?> 
intermediateCompatibilityResult =
@@ -103,17 +105,17 @@ public class CompositeTypeSerializerUtilTest {
        @Test
        public void testIncompatibleIntermediateCompatibilityResult() {
                final TypeSerializerSnapshot<?>[] testSerializerSnapshots = new 
TypeSerializerSnapshot<?>[] {
-                       new 
SchemaCompatibilityTestingSerializer.InitialSerializer().snapshotConfiguration(),
-                       new 
SchemaCompatibilityTestingSerializer.InitialSerializer().snapshotConfiguration(),
-                       new 
SchemaCompatibilityTestingSerializer.InitialSerializer().snapshotConfiguration(),
-                       new 
SchemaCompatibilityTestingSerializer.InitialSerializer().snapshotConfiguration(),
+                       
SchemaCompatibilityTestingSnapshot.thatIsCompatibleWithNextSerializer(),
+                       
SchemaCompatibilityTestingSnapshot.thatIsIncompatibleWithTheNextSerializer(),
+                       
SchemaCompatibilityTestingSnapshot.thatIsCompatibleWithNextSerializerAfterReconfiguration(),
+                       
SchemaCompatibilityTestingSnapshot.thatIsCompatibleWithNextSerializerAfterMigration(),
                };
 
                final TypeSerializer<?>[] testNewSerializers = new 
TypeSerializer<?>[] {
-                       new 
SchemaCompatibilityTestingSerializer.InitialSerializer(),
-                       new 
SchemaCompatibilityTestingSerializer.IncompatibleSerializer<>(),
-                       new 
SchemaCompatibilityTestingSerializer.ReconfigurationRequiringSerializer<>(),
-                       new 
SchemaCompatibilityTestingSerializer.UpgradedSchemaSerializer()
+                       new SchemaCompatibilityTestingSerializer(),
+                       new SchemaCompatibilityTestingSerializer(),
+                       new SchemaCompatibilityTestingSerializer(),
+                       new SchemaCompatibilityTestingSerializer()
                };
 
                IntermediateCompatibilityResult<?> 
intermediateCompatibilityResult =
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshotMigrationTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshotMigrationTest.java
index 5cbee5a..6dfcb64 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshotMigrationTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshotMigrationTest.java
@@ -30,6 +30,7 @@ import org.junit.runners.Parameterized;
 
 import java.util.Collection;
 
+import static 
org.apache.flink.api.common.typeutils.TypeSerializerMatchers.hasSameCompatibilityAs;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -104,7 +105,7 @@ public class PojoSerializerSnapshotMigrationTest extends 
TypeSerializerSnapshotM
                        PojoSerializer.class,
                        PojoSerializerSnapshot.class,
                        
PojoSerializerSnapshotMigrationTest::testPojoWithNewAndRemovedFieldsSerializerSupplier,
-                       
hasSameCompatibilityType(TypeSerializerSchemaCompatibility.compatibleAfterMigration()));
+                       
hasSameCompatibilityAs(TypeSerializerSchemaCompatibility.compatibleAfterMigration()));
 
                return testSpecifications.get();
        }
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshotTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshotTest.java
index 91a3414..7b8b9f2 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshotTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshotTest.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.api.common.typeutils.base.DoubleSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import 
org.apache.flink.testutils.migration.SchemaCompatibilityTestingSerializer;
+import 
org.apache.flink.testutils.migration.SchemaCompatibilityTestingSerializer.SchemaCompatibilityTestingSnapshot;
 
 import org.junit.Test;
 
@@ -39,6 +40,7 @@ import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -120,7 +122,7 @@ public class PojoSerializerSnapshotTest {
                ));
 
                final TypeSerializer<TestPojo> restoredSerializer = 
testSnapshot.restoreSerializer();
-               assertTrue(restoredSerializer.getClass() == 
PojoSerializer.class);
+               assertSame(restoredSerializer.getClass(), PojoSerializer.class);
                final PojoSerializer<TestPojo> restoredPojoSerializer = 
(PojoSerializer<TestPojo>) restoredSerializer;
 
                final Field[] restoredFields = 
restoredPojoSerializer.getFields();
@@ -254,13 +256,13 @@ public class PojoSerializerSnapshotTest {
                        ID_FIELD,
                        mockFieldSerializerSnapshot(
                                NAME_FIELD,
-                               new 
SchemaCompatibilityTestingSerializer.InitialSerializer<>().snapshotConfiguration()),
+                               
SchemaCompatibilityTestingSnapshot.thatIsIncompatibleWithTheNextSerializer()),
                        HEIGHT_FIELD
                ));
 
                final PojoSerializer<TestPojo> newPojoSerializer = 
buildTestNewPojoSerializer(Arrays.asList(
                        ID_FIELD,
-                       mockFieldSerializer(NAME_FIELD, new 
SchemaCompatibilityTestingSerializer.IncompatibleSerializer<>()),
+                       mockFieldSerializer(NAME_FIELD, new 
SchemaCompatibilityTestingSerializer()),
                        HEIGHT_FIELD
                ));
 
@@ -277,13 +279,12 @@ public class PojoSerializerSnapshotTest {
                        NAME_FIELD,
                        mockFieldSerializerSnapshot(
                                HEIGHT_FIELD,
-                               new 
SchemaCompatibilityTestingSerializer.InitialSerializer<>().snapshotConfiguration())
-               ));
+                               
SchemaCompatibilityTestingSnapshot.thatIsCompatibleWithNextSerializerAfterMigration())));
 
                final PojoSerializer<TestPojo> newPojoSerializer = 
buildTestNewPojoSerializer(Arrays.asList(
                        ID_FIELD,
                        NAME_FIELD,
-                       mockFieldSerializer(HEIGHT_FIELD, new 
SchemaCompatibilityTestingSerializer.UpgradedSchemaSerializer<>())
+                       mockFieldSerializer(HEIGHT_FIELD, new 
SchemaCompatibilityTestingSerializer())
                ));
 
                final TypeSerializerSchemaCompatibility<TestPojo> 
resultCompatibility =
@@ -297,13 +298,13 @@ public class PojoSerializerSnapshotTest {
                final PojoSerializerSnapshot<TestPojo> testSnapshot = 
buildTestSnapshot(Arrays.asList(
                        mockFieldSerializerSnapshot(
                                ID_FIELD,
-                               new 
SchemaCompatibilityTestingSerializer.InitialSerializer<>().snapshotConfiguration()),
+                               
SchemaCompatibilityTestingSnapshot.thatIsCompatibleWithNextSerializerAfterReconfiguration()),
                        NAME_FIELD,
                        HEIGHT_FIELD
                ));
 
                final PojoSerializer<TestPojo> newPojoSerializer = 
buildTestNewPojoSerializer(Arrays.asList(
-                       mockFieldSerializer(ID_FIELD, new 
SchemaCompatibilityTestingSerializer.ReconfigurationRequiringSerializer<>()),
+                       mockFieldSerializer(ID_FIELD, new 
SchemaCompatibilityTestingSerializer()),
                        NAME_FIELD,
                        HEIGHT_FIELD
                ));
@@ -314,13 +315,13 @@ public class PojoSerializerSnapshotTest {
                
assertTrue(resultCompatibility.isCompatibleWithReconfiguredSerializer());
 
                final TypeSerializer<TestPojo> reconfiguredSerializer = 
resultCompatibility.getReconfiguredSerializer();
-               assertTrue(reconfiguredSerializer.getClass() == 
PojoSerializer.class);
+               assertSame(reconfiguredSerializer.getClass(), 
PojoSerializer.class);
                final PojoSerializer<TestPojo> reconfiguredPojoSerializer = 
(PojoSerializer<TestPojo>) reconfiguredSerializer;
 
                final TypeSerializer<?>[] reconfiguredFieldSerializers = 
reconfiguredPojoSerializer.getFieldSerializers();
                assertArrayEquals(
                        new TypeSerializer[] {
-                               new 
SchemaCompatibilityTestingSerializer.InitialSerializer(),
+                               new SchemaCompatibilityTestingSerializer(),
                                StringSerializer.INSTANCE,
                                DoubleSerializer.INSTANCE },
                        reconfiguredFieldSerializers);
diff --git 
a/flink-core/src/test/java/org/apache/flink/testutils/migration/SchemaCompatibilityTestingSerializer.java
 
b/flink-core/src/test/java/org/apache/flink/testutils/migration/SchemaCompatibilityTestingSerializer.java
index 253eed7..5beb621 100644
--- 
a/flink-core/src/test/java/org/apache/flink/testutils/migration/SchemaCompatibilityTestingSerializer.java
+++ 
b/flink-core/src/test/java/org/apache/flink/testutils/migration/SchemaCompatibilityTestingSerializer.java
@@ -24,273 +24,240 @@ import 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
-import java.io.IOException;
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+import java.util.function.Function;
 
 /**
  * The {@link SchemaCompatibilityTestingSerializer} is a mock serializer that 
can be used
  * for schema compatibility and serializer upgrade related tests.
  *
- * <p>To use this, tests should always first instantiate an {@link 
InitialSerializer} serializer
- * instance and then take a snapshot of it. Next, depending on what the new 
serializer is provided
- * to the taken snapshot, tests can expect the following compatibility results:
+ * <p>To test serializers compatibility we can start by either obtaining a 
{@link TypeSerializerSnapshot} and restoring
+ * a {@link TypeSerializer} or the other way around, starting with a 
serializer and calling {@link TypeSerializer#snapshotConfiguration()}
+ * to obtain a snapshot class.
+ *
+ * <p>To start from a snapshot, the class {@link 
SchemaCompatibilityTestingSnapshot} can be configured to return a predefined
+ * {@link TypeSerializerSchemaCompatibility} result when
+ * {@link TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer)} 
would be called, the following static factory
+ * methods return a pre-configured snapshot class:
  * <ul>
- *     <li>{@link InitialSerializer} as the new serializer: the compatibility 
result will be
- *         {@link TypeSerializerSchemaCompatibility#compatibleAsIs()}.</li>
- *     <li>{@link UpgradedSchemaSerializer} as the new serializer: the 
compatibility result will be
- *         {@link 
TypeSerializerSchemaCompatibility#compatibleAfterMigration()}.</li>
- *     <li>{@link ReconfigurationRequiringSerializer} as the new serializer: 
the compatibility result
- *         will be {@link 
TypeSerializerSchemaCompatibility#compatibleWithReconfiguredSerializer(TypeSerializer)},
- *         with a new instance of the {@link InitialSerializer} as the wrapped 
reconfigured serializer.</li>
- *     <li>{@link IncompatibleSerializer} as the new serializer: the 
compatibility result will be
- *         {@link TypeSerializerSchemaCompatibility#incompatible()}.</li>
+ * <li>{@code thatIsCompatibleWithNextSerializer}.
+ * <li>{@code thatIsCompatibleWithNextSerializerAfterReconfiguration}</li>
+ * <li>{@code thatIsCompatibleWithNextSerializerAfterMigration}</li>
+ * <li>{@code thatIsIncompatibleWithTheNextSerializer}</li>
  * </ul>
+ *
+ * <p>Here is a simple test example.<pre>{@code
+ * @Test
+ * public void example() {
+ *     TypeSerializerSnapshot<?> snapshot = 
SchemaCompatibilityTestingSnapshot.thatIsCompatibleWithNextSerializer();
+ *
+ *     TypeSerializerSchemaCompatibility<?> result = 
snapshot.resolveSchemaCompatibility(new SchemaCompatibilityTestingSerializer());
+ *
+ *     assertTrue(result.compatibleAsIs());
+ * }
+ * }</pre>
+ *
+ * <p>To start from a serializer, simply create a new instance of {@code 
SchemaCompatibilityTestingSerializer} and call
+ * {@link SchemaCompatibilityTestingSerializer#snapshotConfiguration()} to 
obtain a {@link SchemaCompatibilityTestingSnapshot}.
+ * To control the behaviour of the returned snapshot it is possible to pass a 
function from a {@code newSerializer} to a
+ * {@link TypeSerializerSchemaCompatibility}.
+ *
+ * <p>It is also possible to pass in a {@code String} identifier when 
constructing a snapshot or a serializer to tie a
+ * specific snapshot instance to a specific serializer instance, this might be 
useful when testing composite serializers.
  */
-public abstract class SchemaCompatibilityTestingSerializer<T> extends 
TypeSerializer<T> {
+@SuppressWarnings({"WeakerAccess", "serial"})
+public final class SchemaCompatibilityTestingSerializer extends 
TypeSerializer<Integer> {
 
-       private static final long serialVersionUID = 1L;
+       private static final long serialVersionUID = 2588814752302505240L;
 
-       // 
------------------------------------------------------------------------------------------------
-       //  Irrelevant serializer methods
-       // 
------------------------------------------------------------------------------------------------
+       private final Function<TypeSerializer<Integer>, 
TypeSerializerSchemaCompatibility<Integer>> resolver;
 
-       @Override
-       public TypeSerializer<T> duplicate() {
-               throw new UnsupportedOperationException(
-                       "this is a SchemaCompatibilityTestingSerializer; should 
have only been used for snapshotting / compatibility test purposes.");
+       @Nullable
+       private final String tokenForEqualityChecks;
+
+       public SchemaCompatibilityTestingSerializer() {
+               this(null, ALWAYS_COMPATIBLE);
        }
 
-       @Override
-       public void serialize(T record, DataOutputView target) throws 
IOException {
-               throw new UnsupportedOperationException(
-                       "this is a SchemaCompatibilityTestingSerializer; should 
have only been used for snapshotting / compatibility test purposes.");
+       public SchemaCompatibilityTestingSerializer(String 
tokenForEqualityChecks) {
+               this(tokenForEqualityChecks, ALWAYS_COMPATIBLE);
+       }
 
+       public SchemaCompatibilityTestingSerializer(
+               @Nullable String tokenForEqualityChecks,
+               Function<TypeSerializer<Integer>, 
TypeSerializerSchemaCompatibility<Integer>> resolver) {
+               this.resolver = resolver;
+               this.tokenForEqualityChecks = tokenForEqualityChecks;
        }
 
        @Override
-       public T deserialize(T reuse, DataInputView source) throws IOException {
-               throw new UnsupportedOperationException(
-                       "this is a SchemaCompatibilityTestingSerializer; should 
have only been used for snapshotting / compatibility test purposes.");
-
+       public boolean equals(Object obj) {
+               return (obj instanceof SchemaCompatibilityTestingSerializer) &&
+                       Objects.equals(tokenForEqualityChecks, 
((SchemaCompatibilityTestingSerializer) obj).tokenForEqualityChecks);
        }
 
        @Override
-       public T deserialize(DataInputView source) throws IOException {
-               throw new UnsupportedOperationException(
-                       "this is a SchemaCompatibilityTestingSerializer; should 
have only been used for snapshotting / compatibility test purposes.");
-
+       public int hashCode() {
+               return Objects.hash(getClass(), tokenForEqualityChecks);
        }
 
        @Override
-       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-               throw new UnsupportedOperationException(
-                       "this is a SchemaCompatibilityTestingSerializer; should 
have only been used for snapshotting / compatibility test purposes.");
-
+       public String toString() {
+               return "SchemaCompatibilityTestingSerializer{" +
+                       "tokenForEqualityChecks='" + tokenForEqualityChecks + 
'\'' +
+                       '}';
        }
 
        @Override
-       public T copy(T from) {
-               throw new UnsupportedOperationException(
-                       "this is a SchemaCompatibilityTestingSerializer; should 
have only been used for snapshotting / compatibility test purposes.");
-
+       public TypeSerializerSnapshot<Integer> snapshotConfiguration() {
+               return new 
SchemaCompatibilityTestingSnapshot(tokenForEqualityChecks, resolver);
        }
 
-       @Override
-       public T copy(T from, T reuse) {
-               throw new UnsupportedOperationException(
-                       "this is a SchemaCompatibilityTestingSerializer; should 
have only been used for snapshotting / compatibility test purposes.");
+       // 
-----------------------------------------------------------------------------------------------------------
+       // Serialization related methods are not supported
+       // 
-----------------------------------------------------------------------------------------------------------
 
+       @Override
+       public boolean isImmutableType() {
+               throw new UnsupportedOperationException();
        }
 
        @Override
-       public T createInstance() {
-               throw new UnsupportedOperationException(
-                       "this is a SchemaCompatibilityTestingSerializer; should 
have only been used for snapshotting / compatibility test purposes.");
+       public TypeSerializer<Integer> duplicate() {
+               throw new UnsupportedOperationException();
+       }
 
+       @Override
+       public Integer createInstance() {
+               throw new UnsupportedOperationException();
        }
 
        @Override
-       public boolean isImmutableType() {
-               throw new UnsupportedOperationException(
-                       "this is a SchemaCompatibilityTestingSerializer; should 
have only been used for snapshotting / compatibility test purposes.");
+       public Integer copy(Integer from) {
+               throw new UnsupportedOperationException();
+       }
 
+       @Override
+       public Integer copy(Integer from, Integer reuse) {
+               throw new UnsupportedOperationException();
        }
 
        @Override
        public int getLength() {
-               throw new UnsupportedOperationException(
-                       "this is a SchemaCompatibilityTestingSerializer; should 
have only been used for snapshotting / compatibility test purposes.");
-
+               throw new UnsupportedOperationException();
        }
 
-       // 
------------------------------------------------------------------------------------------------
-       //  Test serializer variants
-       // 
------------------------------------------------------------------------------------------------
-
-       public static class InitialSerializer<T> extends 
SchemaCompatibilityTestingSerializer<T> {
-
-               private static final long serialVersionUID = 1L;
+       @Override
+       public void serialize(Integer record, DataOutputView target) {
+               throw new UnsupportedOperationException();
+       }
 
-               @Override
-               public TypeSerializerSnapshot<T> snapshotConfiguration() {
-                       return new InitialSerializerSnapshot();
-               }
+       @Override
+       public Integer deserialize(DataInputView source) {
+               throw new UnsupportedOperationException();
+       }
 
-               @Override
-               public boolean equals(Object obj) {
-                       return obj != null && obj.getClass() == 
InitialSerializer.class;
-               }
+       @Override
+       public Integer deserialize(Integer reuse, DataInputView source) {
+               throw new UnsupportedOperationException();
+       }
 
-               @Override
-               public int hashCode() {
-                       return getClass().hashCode();
-               }
+       @Override
+       public void copy(DataInputView source, DataOutputView target) {
+               throw new UnsupportedOperationException();
        }
 
-       public static class UpgradedSchemaSerializer<T> extends 
SchemaCompatibilityTestingSerializer<T> {
+       // 
-----------------------------------------------------------------------------------------------------------
+       // Utils
+       // 
-----------------------------------------------------------------------------------------------------------
 
-               private static final long serialVersionUID = 1L;
+       private static final Function<TypeSerializer<Integer>, 
TypeSerializerSchemaCompatibility<Integer>> ALWAYS_COMPATIBLE =
+               unused -> TypeSerializerSchemaCompatibility.compatibleAsIs();
 
-               @Override
-               public TypeSerializerSnapshot<T> snapshotConfiguration() {
-                       return new UpgradedSchemaSerializerSnapshot();
-               }
+       // 
-----------------------------------------------------------------------------------------------------------
+       // Snapshot class
+       // 
-----------------------------------------------------------------------------------------------------------
 
-               @Override
-               public boolean equals(Object obj) {
-                       return obj != null && obj.getClass() == 
UpgradedSchemaSerializer.class;
-               }
+       /**
+        * A configurable {@link TypeSerializerSnapshot} for this serializer.
+        */
+       @SuppressWarnings("WeakerAccess")
+       public static final class SchemaCompatibilityTestingSnapshot implements 
TypeSerializerSnapshot<Integer> {
 
-               @Override
-               public int hashCode() {
-                       return getClass().hashCode();
+               public static SchemaCompatibilityTestingSnapshot 
thatIsCompatibleWithNextSerializer() {
+                       return thatIsCompatibleWithNextSerializer(null);
                }
-       }
-
-       public static class ReconfigurationRequiringSerializer<T> extends 
SchemaCompatibilityTestingSerializer<T> {
-
-               private static final long serialVersionUID = 1L;
 
-               @Override
-               public TypeSerializerSnapshot<T> snapshotConfiguration() {
-                       throw new UnsupportedOperationException(
-                               "this is a ReconfigurationRequiringSerializer; 
should not have been snapshotted.");
+               public static SchemaCompatibilityTestingSnapshot 
thatIsCompatibleWithNextSerializer(String tokenForEqualityChecks) {
+                       return new 
SchemaCompatibilityTestingSnapshot(tokenForEqualityChecks, unused -> 
TypeSerializerSchemaCompatibility.compatibleAsIs());
                }
 
-               @Override
-               public boolean equals(Object obj) {
-                       return obj != null && obj.getClass() == 
ReconfigurationRequiringSerializer.class;
+               public static SchemaCompatibilityTestingSnapshot 
thatIsCompatibleWithNextSerializerAfterReconfiguration() {
+                       return 
thatIsCompatibleWithNextSerializerAfterReconfiguration(null);
                }
 
-               @Override
-               public int hashCode() {
-                       return getClass().hashCode();
+               public static SchemaCompatibilityTestingSnapshot 
thatIsCompatibleWithNextSerializerAfterReconfiguration(String 
tokenForEqualityChecks) {
+                       SchemaCompatibilityTestingSerializer 
reconfiguredSerializer = new 
SchemaCompatibilityTestingSerializer(tokenForEqualityChecks, ALWAYS_COMPATIBLE);
+                       return new 
SchemaCompatibilityTestingSnapshot(tokenForEqualityChecks, unused -> 
TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(reconfiguredSerializer));
                }
-       }
-
-       public static class IncompatibleSerializer<T> extends 
SchemaCompatibilityTestingSerializer<T> {
-
-               private static final long serialVersionUID = 1L;
 
-               @Override
-               public TypeSerializerSnapshot<T> snapshotConfiguration() {
-                       throw new UnsupportedOperationException(
-                               "this is a IncompatibleSerializer; should not 
have been snapshotted.");
+               public static SchemaCompatibilityTestingSnapshot 
thatIsCompatibleWithNextSerializerAfterMigration() {
+                       return 
thatIsCompatibleWithNextSerializerAfterMigration(null);
                }
 
-               @Override
-               public boolean equals(Object obj) {
-                       return obj != null && obj.getClass() == 
IncompatibleSerializer.class;
+               public static SchemaCompatibilityTestingSnapshot 
thatIsCompatibleWithNextSerializerAfterMigration(String tokenForEqualityChecks) 
{
+                       return new 
SchemaCompatibilityTestingSnapshot(tokenForEqualityChecks, unused -> 
TypeSerializerSchemaCompatibility.compatibleAfterMigration());
                }
 
-               @Override
-               public int hashCode() {
-                       return getClass().hashCode();
+               public static SchemaCompatibilityTestingSnapshot 
thatIsIncompatibleWithTheNextSerializer() {
+                       return thatIsIncompatibleWithTheNextSerializer(null);
                }
-       }
-
-       // 
------------------------------------------------------------------------------------------------
-       //  Test serializer snapshots
-       // 
------------------------------------------------------------------------------------------------
 
-       public class InitialSerializerSnapshot implements 
TypeSerializerSnapshot<T> {
-
-               @Override
-               public int getCurrentVersion() {
-                       return 1;
+               public static SchemaCompatibilityTestingSnapshot 
thatIsIncompatibleWithTheNextSerializer(String tokenForEqualityChecks) {
+                       return new 
SchemaCompatibilityTestingSnapshot(tokenForEqualityChecks, unused -> 
TypeSerializerSchemaCompatibility.incompatible());
                }
 
-               @Override
-               public void writeSnapshot(DataOutputView out) throws 
IOException {}
-
-               @Override
-               public void readSnapshot(int readVersion, DataInputView in, 
ClassLoader userCodeClassLoader) throws IOException {}
+               @Nullable
+               private final String tokenForEqualityChecks;
+               private final Function<TypeSerializer<Integer>, 
TypeSerializerSchemaCompatibility<Integer>> resolver;
 
-               @Override
-               public TypeSerializer<T> restoreSerializer() {
-                       return new 
SchemaCompatibilityTestingSerializer.InitialSerializer<>();
+               SchemaCompatibilityTestingSnapshot(@Nullable String 
tokenForEqualityChecks, Function<TypeSerializer<Integer>, 
TypeSerializerSchemaCompatibility<Integer>> resolver) {
+                       this.tokenForEqualityChecks = tokenForEqualityChecks;
+                       this.resolver = resolver;
                }
 
                @Override
-               public TypeSerializerSchemaCompatibility<T> 
resolveSchemaCompatibility(TypeSerializer<T> newSerializer) {
-                       if (newSerializer instanceof 
SchemaCompatibilityTestingSerializer.InitialSerializer) {
-                               return 
TypeSerializerSchemaCompatibility.compatibleAsIs();
-                       }
-
-                       if (newSerializer instanceof 
SchemaCompatibilityTestingSerializer.UpgradedSchemaSerializer) {
-                               return 
TypeSerializerSchemaCompatibility.compatibleAfterMigration();
-                       }
-
-                       if (newSerializer instanceof 
SchemaCompatibilityTestingSerializer.ReconfigurationRequiringSerializer) {
-                               return 
TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(new 
SchemaCompatibilityTestingSerializer.InitialSerializer<T>());
+               public TypeSerializerSchemaCompatibility<Integer> 
resolveSchemaCompatibility(TypeSerializer<Integer> newSerializer) {
+                       if (!(newSerializer instanceof 
SchemaCompatibilityTestingSerializer)) {
+                               return 
TypeSerializerSchemaCompatibility.incompatible();
                        }
-
-                       if (newSerializer instanceof 
SchemaCompatibilityTestingSerializer.IncompatibleSerializer) {
+                       SchemaCompatibilityTestingSerializer 
schemaCompatibilityTestingSerializer = (SchemaCompatibilityTestingSerializer) 
newSerializer;
+                       if 
(!(Objects.equals(schemaCompatibilityTestingSerializer.tokenForEqualityChecks, 
tokenForEqualityChecks))) {
                                return 
TypeSerializerSchemaCompatibility.incompatible();
                        }
-
-                       // reaches here if newSerializer isn't a 
SchemaCompatibilityTestingSerializer
-                       return TypeSerializerSchemaCompatibility.incompatible();
+                       return resolver.apply(newSerializer);
                }
-       }
-
-       public class UpgradedSchemaSerializerSnapshot implements 
TypeSerializerSnapshot<T> {
 
                @Override
                public int getCurrentVersion() {
-                       return 1;
+                       throw new UnsupportedOperationException();
                }
 
                @Override
-               public void writeSnapshot(DataOutputView out) throws 
IOException {}
-
-               @Override
-               public void readSnapshot(int readVersion, DataInputView in, 
ClassLoader userCodeClassLoader) throws IOException {}
+               public void writeSnapshot(DataOutputView out) {
+                       throw new UnsupportedOperationException();
+               }
 
                @Override
-               public TypeSerializer<T> restoreSerializer() {
-                       return new 
SchemaCompatibilityTestingSerializer.UpgradedSchemaSerializer<>();
+               public void readSnapshot(int readVersion, DataInputView in, 
ClassLoader userCodeClassLoader) {
+                       throw new UnsupportedOperationException();
                }
 
                @Override
-               public TypeSerializerSchemaCompatibility<T> 
resolveSchemaCompatibility(TypeSerializer<T> newSerializer) {
-                       if (newSerializer instanceof 
SchemaCompatibilityTestingSerializer.InitialSerializer) {
-                               return 
TypeSerializerSchemaCompatibility.incompatible();
-                       }
-
-                       if (newSerializer instanceof 
SchemaCompatibilityTestingSerializer.UpgradedSchemaSerializer) {
-                               return 
TypeSerializerSchemaCompatibility.compatibleAsIs();
-                       }
-
-                       if (newSerializer instanceof 
SchemaCompatibilityTestingSerializer.ReconfigurationRequiringSerializer) {
-                               return 
TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(new 
SchemaCompatibilityTestingSerializer.UpgradedSchemaSerializer<T>());
-                       }
-
-                       if (newSerializer instanceof 
SchemaCompatibilityTestingSerializer.IncompatibleSerializer) {
-                               return 
TypeSerializerSchemaCompatibility.incompatible();
-                       }
-
-                       // reaches here if newSerializer isn't a 
SchemaCompatibilityTestingSerializer
-                       return TypeSerializerSchemaCompatibility.incompatible();
+               public TypeSerializer<Integer> restoreSerializer() {
+                       return new 
SchemaCompatibilityTestingSerializer(tokenForEqualityChecks, resolver);
                }
        }
 }

Reply via email to