This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9a709146769b47727b8887807b882db50ffd8c0f
Author: Aljoscha Krettek <[email protected]>
AuthorDate: Tue Jan 14 13:25:22 2020 +0100

    [FLINK-13632] Port AvroSerializer upgrade test to 
TypeSerializerUpgradeTestBase
---
 .../typeutils/AvroSerializerMigrationTest.java     | 259 ---------------------
 .../avro/typeutils/AvroSerializerUpgradeTest.java  | 185 +++++++++++++++
 ...6-avro-generic-type-serializer-address-snapshot | Bin 901 -> 0 bytes
 .../resources/flink-1.6-avro-type-serialized-data  | Bin 23563 -> 0 bytes
 .../flink-1.6-avro-type-serializer-address-data    | Bin 240 -> 0 bytes
 ...flink-1.6-avro-type-serializer-address-snapshot | Bin 710 -> 0 bytes
 .../flink-1.6-avro-type-serializer-snapshot        | Bin 36411 -> 0 bytes
 ...7-avro-generic-type-serializer-address-snapshot | Bin 370 -> 0 bytes
 .../flink-1.7-avro-type-serializer-address-data    | Bin 240 -> 0 bytes
 ...flink-1.7-avro-type-serializer-address-snapshot | Bin 380 -> 0 bytes
 .../src/test/resources/flink_11-kryo_registrations |  86 -------
 11 files changed, 185 insertions(+), 345 deletions(-)

diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerMigrationTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerMigrationTest.java
deleted file mode 100644
index a162aa3..0000000
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerMigrationTest.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.formats.avro.typeutils;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
-import 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
-import org.apache.flink.core.memory.DataInputDeserializer;
-import org.apache.flink.formats.avro.generated.Address;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
-import org.apache.flink.testutils.migration.MigrationVersion;
-
-import org.apache.avro.generic.GenericRecord;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.Arrays;
-import java.util.Base64;
-import java.util.Collection;
-import java.util.List;
-import java.util.stream.Collectors;
-
-import static java.util.Arrays.asList;
-import static junit.framework.TestCase.assertSame;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-/**
- * Tests migrations for {@link AvroSerializerSnapshot}.
- */
-@RunWith(Parameterized.class)
-public class AvroSerializerMigrationTest extends 
TypeSerializerSnapshotMigrationTestBase<Address> {
-
-       private static final String DATA_FILE_FORMAT = 
"flink-%s-avro-type-serializer-address-data";
-       private static final String SPECIFIC_SNAPSHOT_FILE_FORMAT = 
"flink-%s-avro-type-serializer-address-snapshot";
-       private static final String GENERIC_SNAPSHOT_FILE_FORMAT = 
"flink-%s-avro-generic-type-serializer-address-snapshot";
-
-       public AvroSerializerMigrationTest(TestSpecification<Address> testSpec) 
{
-               super(testSpec);
-       }
-
-       @SuppressWarnings("unchecked")
-       @Parameterized.Parameters(name = "Test Specification = {0}")
-       public static Collection<TestSpecification<?>> testSpecifications() {
-
-               final TestSpecifications testSpecifications = new 
TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
-
-               testSpecifications.add(
-                       "generic-avro-serializer",
-                       AvroSerializer.class,
-                       AvroSerializerSnapshot.class,
-                       () -> new AvroSerializer(GenericRecord.class, 
Address.getClassSchema()),
-                       testVersion -> 
String.format(GENERIC_SNAPSHOT_FILE_FORMAT, testVersion),
-                       testVersion -> String.format(DATA_FILE_FORMAT, 
testVersion),
-                       10);
-               testSpecifications.add(
-                       "specific-avro-serializer",
-                       AvroSerializer.class,
-                       AvroSerializerSnapshot.class,
-                       () -> new AvroSerializer<>(Address.class),
-                       testVersion -> 
String.format(SPECIFIC_SNAPSHOT_FILE_FORMAT, testVersion),
-                       testVersion -> String.format(DATA_FILE_FORMAT, 
testVersion),
-                       10);
-
-               return testSpecifications.get();
-       }
-
-       // 
---------------------------------------------------------------------------------------------------------------
-       // The following batch of tests are making sure that AvroSerializer 
class is able to be Java-Deserialized.
-       // see [FLINK-11436] for more information.
-
-       // Once we drop support for versions that carried snapshots with 
Java-Deserialized serializers we can drop this
-       // batch of tests.
-       // 
---------------------------------------------------------------------------------------------------------------
-
-       @Test
-       public void javaDeserializeFromFlink_1_5_ReflectiveRecord() throws 
IOException {
-               final String avroSerializerBase64 = 
"AAAAAQAAAQis7QAFc3IANm9yZy5hcGFjaGUuZmxpbmsuZm9ybWF0cy5hdnJvLnR5cGV1dGlscy5BdnJv\n"
 +
-                       
"U2VyaWFsaXplcgAAAAAAAAABAgABTAAEdHlwZXQAEUxqYXZhL2xhbmcvQ2xhc3M7eHIANG9yZy5hcGFj\n"
 +
-                       
"aGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZVNlcmlhbGl6ZXIAAAAAAAAAAQIAAHhwdnIA\n"
 +
-                       
"Tm9yZy5hcGFjaGUuZmxpbmsuZm9ybWF0cy5hdnJvLnR5cGV1dGlscy5BdnJvU2VyaWFsaXplck1pZ3Jh\n"
 +
-                       "dGlvblRlc3QkU2ltcGxlUG9qbwAAAAAAAAAAAAAAeHA=";
-
-               TypeSerializer<?> serializer = 
javaDeserialize(avroSerializerBase64);
-               assertThat(serializer, instanceOf(AvroSerializer.class));
-
-               AvroSerializer<?> avroSerializer = (AvroSerializer<?>) 
javaDeserialize(avroSerializerBase64);
-               assertSame(avroSerializer.getType(), SimplePojo.class);
-               assertThat(avroSerializer.getAvroSchema(), notNullValue());
-       }
-
-       @Test
-       public void javaDeserializeFromFlink_1_5_SpecificRecord() throws 
IOException {
-               final String avroSerializerBase64 = 
"AAAAAQAAASOs7QAFc3IANm9yZy5hcGFjaGUuZmxpbmsuZm9ybWF0cy5hdnJvLnR5cGV1dGlscy5BdnJv\n"
 +
-                       
"U2VyaWFsaXplcgAAAAAAAAABAgABTAAEdHlwZXQAEUxqYXZhL2xhbmcvQ2xhc3M7eHIANG9yZy5hcGFj\n"
 +
-                       
"aGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZVNlcmlhbGl6ZXIAAAAAAAAAAQIAAHhwdnIA\n"
 +
-                       
"L29yZy5hcGFjaGUuZmxpbmsuZm9ybWF0cy5hdnJvLmdlbmVyYXRlZC5BZGRyZXNz7Paj+KjgQ2oMAAB4\n"
 +
-                       
"cgArb3JnLmFwYWNoZS5hdnJvLnNwZWNpZmljLlNwZWNpZmljUmVjb3JkQmFzZQKi+azGtzQdDAAAeHA=";
-
-               TypeSerializer<?> serializer = 
javaDeserialize(avroSerializerBase64);
-               assertThat(serializer, instanceOf(AvroSerializer.class));
-
-               AvroSerializer<?> avroSerializer = (AvroSerializer<?>) 
javaDeserialize(avroSerializerBase64);
-               assertSame(avroSerializer.getType(), Address.class);
-               assertThat(avroSerializer.getAvroSchema(), is(Address.SCHEMA$));
-       }
-
-       @Test
-       public void javaDeserializeFromFlink_1_6() throws IOException {
-               final String avroSerializer = 
"AAAAAQAAAUis7QAFc3IANm9yZy5hcGFjaGUuZmxpbmsuZm9ybWF0cy5hdnJvLnR5cGV1dGlscy5BdnJv\n"
 +
-                       
"U2VyaWFsaXplcgAAAAAAAAABAgACTAAMc2NoZW1hU3RyaW5ndAASTGphdmEvbGFuZy9TdHJpbmc7TAAE\n"
 +
-                       
"dHlwZXQAEUxqYXZhL2xhbmcvQ2xhc3M7eHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBl\n"
 +
-                       
"dXRpbHMuVHlwZVNlcmlhbGl6ZXIAAAAAAAAAAQIAAHhwcHZyAC9vcmcuYXBhY2hlLmZsaW5rLmZvcm1h\n"
 +
-                       
"dHMuYXZyby5nZW5lcmF0ZWQuQWRkcmVzc+z2o/io4ENqDAAAeHIAK29yZy5hcGFjaGUuYXZyby5zcGVj\n"
 +
-                       "aWZpYy5TcGVjaWZpY1JlY29yZEJhc2UCovmsxrc0HQwAAHhw";
-
-               TypeSerializer<?> avro = javaDeserialize(avroSerializer);
-
-               assertThat(avro, instanceOf(AvroSerializer.class));
-       }
-
-       @Test
-       public void javaDeserializeFromFlink_1_6_GenericRecord() throws 
IOException {
-               String avroSerializerBase64 = 
"AAAAAQAAAges7QAFc3IANm9yZy5hcGFjaGUuZmxpbmsuZm9ybWF0cy5hdnJvLnR5cGV1dGlscy5BdnJv\n"
 +
-                       
"U2VyaWFsaXplcgAAAAAAAAABAgACTAAMc2NoZW1hU3RyaW5ndAASTGphdmEvbGFuZy9TdHJpbmc7TAAE\n"
 +
-                       
"dHlwZXQAEUxqYXZhL2xhbmcvQ2xhc3M7eHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBl\n"
 +
-                       
"dXRpbHMuVHlwZVNlcmlhbGl6ZXIAAAAAAAAAAQIAAHhwdAEBeyJ0eXBlIjoicmVjb3JkIiwibmFtZSI6\n"
 +
-                       
"IkFkZHJlc3MiLCJuYW1lc3BhY2UiOiJvcmcuYXBhY2hlLmZsaW5rLmZvcm1hdHMuYXZyby5nZW5lcmF0\n"
 +
-                       
"ZWQiLCJmaWVsZHMiOlt7Im5hbWUiOiJudW0iLCJ0eXBlIjoiaW50In0seyJuYW1lIjoic3RyZWV0Iiwi\n"
 +
-                       
"dHlwZSI6InN0cmluZyJ9LHsibmFtZSI6ImNpdHkiLCJ0eXBlIjoic3RyaW5nIn0seyJuYW1lIjoic3Rh\n"
 +
-                       
"dGUiLCJ0eXBlIjoic3RyaW5nIn0seyJuYW1lIjoiemlwIiwidHlwZSI6InN0cmluZyJ9XX12cgAlb3Jn\n"
 +
-                       
"LmFwYWNoZS5hdnJvLmdlbmVyaWMuR2VuZXJpY1JlY29yZAAAAAAAAAAAAAAAeHA=";
-
-               TypeSerializer<?> serializer = 
javaDeserialize(avroSerializerBase64);
-
-               AvroSerializer<?> avroSerializer = (AvroSerializer<?>) 
serializer;
-               assertSame(avroSerializer.getType(), GenericRecord.class);
-               assertThat(avroSerializer.getAvroSchema(), notNullValue());
-       }
-
-       @Test
-       public void javaDeserializeFromFlink_1_7() throws IOException {
-               String avroSerializerBase64 = 
"AAAAAQAAAeKs7QAFc3IANm9yZy5hcGFjaGUuZmxpbmsuZm9ybWF0cy5hdnJvLnR5cGV1dGlscy5BdnJv\n"
 +
-                       
"U2VyaWFsaXplcgAAAAAAAAACAgADTAAOcHJldmlvdXNTY2hlbWF0AEBMb3JnL2FwYWNoZS9mbGluay9m\n"
 +
-                       
"b3JtYXRzL2F2cm8vdHlwZXV0aWxzL1NlcmlhbGl6YWJsZUF2cm9TY2hlbWE7TAAGc2NoZW1hcQB+AAFM\n"
 +
-                       
"AAR0eXBldAARTGphdmEvbGFuZy9DbGFzczt4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5\n"
 +
-                       
"cGV1dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHBzcgA+b3JnLmFwYWNoZS5mbGluay5mb3Jt\n"
 +
-                       
"YXRzLmF2cm8udHlwZXV0aWxzLlNlcmlhbGl6YWJsZUF2cm9TY2hlbWEAAAAAAAAAAQMAAHhwdwEAeHNx\n"
 +
-                       
"AH4ABXcBAHh2cgAvb3JnLmFwYWNoZS5mbGluay5mb3JtYXRzLmF2cm8uZ2VuZXJhdGVkLkFkZHJlc3Ps\n"
 +
-                       
"9qP4qOBDagwAAHhyACtvcmcuYXBhY2hlLmF2cm8uc3BlY2lmaWMuU3BlY2lmaWNSZWNvcmRCYXNlAqL5\n"
 +
-                       "rMa3NB0MAAB4cA==";
-
-               AvroSerializer<?> avroSerializer = (AvroSerializer<?>) 
javaDeserialize(avroSerializerBase64);
-               assertSame(avroSerializer.getType(), Address.class);
-               assertThat(avroSerializer.getAvroSchema(), is(Address.SCHEMA$));
-       }
-
-       @Test
-       public void javaDeserializeFromFlink_1_7_afterInitialization() throws 
IOException {
-               String avroSerializerBase64 = 
"AAAAAQAAAeKs7QAFc3IANm9yZy5hcGFjaGUuZmxpbmsuZm9ybWF0cy5hdnJvLnR5cGV1dGlscy5BdnJv\n"
 +
-                       
"U2VyaWFsaXplcgAAAAAAAAACAgADTAAOcHJldmlvdXNTY2hlbWF0AEBMb3JnL2FwYWNoZS9mbGluay9m\n"
 +
-                       
"b3JtYXRzL2F2cm8vdHlwZXV0aWxzL1NlcmlhbGl6YWJsZUF2cm9TY2hlbWE7TAAGc2NoZW1hcQB+AAFM\n"
 +
-                       
"AAR0eXBldAARTGphdmEvbGFuZy9DbGFzczt4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5\n"
 +
-                       
"cGV1dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHBzcgA+b3JnLmFwYWNoZS5mbGluay5mb3Jt\n"
 +
-                       
"YXRzLmF2cm8udHlwZXV0aWxzLlNlcmlhbGl6YWJsZUF2cm9TY2hlbWEAAAAAAAAAAQMAAHhwdwEAeHNx\n"
 +
-                       
"AH4ABXcBAHh2cgAvb3JnLmFwYWNoZS5mbGluay5mb3JtYXRzLmF2cm8uZ2VuZXJhdGVkLkFkZHJlc3Ps\n"
 +
-                       
"9qP4qOBDagwAAHhyACtvcmcuYXBhY2hlLmF2cm8uc3BlY2lmaWMuU3BlY2lmaWNSZWNvcmRCYXNlAqL5\n"
 +
-                       "rMa3NB0MAAB4cA==";
-
-               AvroSerializer<?> avroSerializer = (AvroSerializer<?>) 
javaDeserialize(avroSerializerBase64);
-               assertSame(avroSerializer.getType(), Address.class);
-               assertThat(avroSerializer.getAvroSchema(), is(Address.SCHEMA$));
-       }
-
-       @Test
-       public void compositeSerializerFromFlink_1_6_WithNestedAvroSerializer() 
throws IOException {
-               String streamElementSerializerBase64 = 
"AAAAAQAAAq2s7QAFc3IAR29yZy5hcGFjaGUuZmxpbmsuc3RyZWFtaW5nLnJ1bnRpbWUuc3RyZWFtcmVj\n"
 +
-                       
"b3JkLlN0cmVhbUVsZW1lbnRTZXJpYWxpemVyAAAAAAAAAAECAAFMAA50eXBlU2VyaWFsaXplcnQANkxv\n"
 +
-                       
"cmcvYXBhY2hlL2ZsaW5rL2FwaS9jb21tb24vdHlwZXV0aWxzL1R5cGVTZXJpYWxpemVyO3hyADRvcmcu\n"
 +
-                       
"YXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLlR5cGVTZXJpYWxpemVyAAAAAAAAAAECAAB4\n"
 +
-                       
"cHNyADZvcmcuYXBhY2hlLmZsaW5rLmZvcm1hdHMuYXZyby50eXBldXRpbHMuQXZyb1NlcmlhbGl6ZXIA\n"
 +
-                       
"AAAAAAAAAQIAAkwADHNjaGVtYVN0cmluZ3QAEkxqYXZhL2xhbmcvU3RyaW5nO0wABHR5cGV0ABFMamF2\n"
 +
-                       
"YS9sYW5nL0NsYXNzO3hxAH4AAnQBAXsidHlwZSI6InJlY29yZCIsIm5hbWUiOiJBZGRyZXNzIiwibmFt\n"
 +
-                       
"ZXNwYWNlIjoib3JnLmFwYWNoZS5mbGluay5mb3JtYXRzLmF2cm8uZ2VuZXJhdGVkIiwiZmllbGRzIjpb\n"
 +
-                       
"eyJuYW1lIjoibnVtIiwidHlwZSI6ImludCJ9LHsibmFtZSI6InN0cmVldCIsInR5cGUiOiJzdHJpbmci\n"
 +
-                       
"fSx7Im5hbWUiOiJjaXR5IiwidHlwZSI6InN0cmluZyJ9LHsibmFtZSI6InN0YXRlIiwidHlwZSI6InN0\n"
 +
-                       
"cmluZyJ9LHsibmFtZSI6InppcCIsInR5cGUiOiJzdHJpbmcifV19dnIAJW9yZy5hcGFjaGUuYXZyby5n\n"
 +
-                       "ZW5lcmljLkdlbmVyaWNSZWNvcmQAAAAAAAAAAAAAAHhw";
-
-               StreamElementSerializer<?> ser = (StreamElementSerializer<?>) 
javaDeserialize(streamElementSerializerBase64);
-               TypeSerializer<?> containedTypeSerializer = 
ser.getContainedTypeSerializer();
-
-               assertThat(containedTypeSerializer, 
instanceOf(AvroSerializer.class));
-
-               AvroSerializer<?> avroSerializer = (AvroSerializer<?>) 
containedTypeSerializer;
-               assertSame(avroSerializer.getType(), GenericRecord.class);
-               assertThat(avroSerializer.getAvroSchema(), is(Address.SCHEMA$));
-       }
-
-       @Test
-       public void makeSureThatFieldsWereNotChanged() {
-               // This test should be removed once we completely migrate all 
the composite serializers.
-
-               List<String> serializedFieldNames = 
Arrays.stream(AvroSerializer.class.getDeclaredFields())
-                       .filter(field -> 
!Modifier.isTransient(field.getModifiers()))
-                       .filter(field -> 
!Modifier.isStatic(field.getModifiers()))
-                       .map(Field::getName)
-                       .sorted()
-                       .collect(Collectors.toList());
-
-               assertThat(serializedFieldNames, is(asList("previousSchema", 
"schema", "type")));
-       }
-
-       @SuppressWarnings("deprecation")
-       private static TypeSerializer<?> javaDeserialize(String base64) throws 
IOException {
-               byte[] bytes = Base64.getMimeDecoder().decode(base64);
-               DataInputDeserializer in = new DataInputDeserializer(bytes);
-               return TypeSerializerSerializationUtil.tryReadSerializer(in, 
Thread.currentThread().getContextClassLoader());
-       }
-
-       /**
-        * A simple pojo used in these tests.
-        */
-       public static class SimplePojo {
-               private String foo;
-
-               @SuppressWarnings("unused")
-               public String getFoo() {
-                       return foo;
-               }
-
-               @SuppressWarnings("unused")
-               public void setFoo(String foo) {
-                       this.foo = foo;
-               }
-       }
-}
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerUpgradeTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerUpgradeTest.java
new file mode 100644
index 0000000..f647c92
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerUpgradeTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerMatchers;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerUpgradeTestBase;
+import org.apache.flink.formats.avro.generated.Address;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.hamcrest.Matcher;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static org.hamcrest.Matchers.is;
+
+/**
+ * Tests based on {@link TypeSerializerUpgradeTestBase} for the {@link 
AvroSerializer}.
+ */
+@RunWith(Parameterized.class)
+public class AvroSerializerUpgradeTest extends 
TypeSerializerUpgradeTestBase<Object, Object> {
+
+       public AvroSerializerUpgradeTest(TestSpecification<Object, Object> 
testSpecification) {
+               super(testSpecification);
+       }
+
+       @Parameterized.Parameters(name = "Test Specification = {0}")
+       public static Collection<TestSpecification<?, ?>> testSpecifications() 
throws Exception {
+               ArrayList<TestSpecification<?, ?>> testSpecifications = new 
ArrayList<>();
+               for (MigrationVersion migrationVersion : migrationVersions) {
+                       testSpecifications.add(
+                                       new TestSpecification<>(
+                                                       
"generic-avro-serializer",
+                                                       migrationVersion,
+                                                       
GenericAvroSerializerSetup.class,
+                                                       
GenericAvroSerializerVerifier.class));
+
+                       testSpecifications.add(
+                                       new TestSpecification<>(
+                                                       
"specific-avro-serializer",
+                                                       migrationVersion,
+                                                       
SpecificAvroSerializerSetup.class,
+                                                       
SpecificAvroSerializerVerifier.class));
+               }
+
+               return testSpecifications;
+       }
+
+       // 
----------------------------------------------------------------------------------------------
+       //  Specification for "generic-avro-serializer"
+       // 
----------------------------------------------------------------------------------------------
+
+       /**
+        * This class is only public to work with {@link 
org.apache.flink.api.common.typeutils.ClassRelocator}.
+        */
+       public static final class GenericAvroSerializerSetup implements 
TypeSerializerUpgradeTestBase.PreUpgradeSetup<GenericRecord> {
+
+               @Override
+               public TypeSerializer<GenericRecord> createPriorSerializer() {
+                       return new AvroSerializer<>(
+                                       GenericRecord.class,
+                                       Address.getClassSchema());
+               }
+
+               @Override
+               public GenericRecord createTestData() {
+                       GenericData.Record record = new 
GenericData.Record(Address.getClassSchema());
+                       record.put("num", 239);
+                       record.put("street", "Baker Street");
+                       record.put("city", "London");
+                       record.put("state", "London");
+                       record.put("zip", "NW1 6XE");
+                       return record;
+               }
+       }
+
+       /**
+        * This class is only public to work with {@link 
org.apache.flink.api.common.typeutils.ClassRelocator}.
+        */
+       public static final class GenericAvroSerializerVerifier implements 
TypeSerializerUpgradeTestBase.UpgradeVerifier<GenericRecord> {
+
+               @SuppressWarnings({"unchecked", "rawtypes"})
+               @Override
+               public TypeSerializer<GenericRecord> createUpgradedSerializer() 
{
+                       return new AvroSerializer(
+                                       GenericRecord.class,
+                                       Address.getClassSchema());
+               }
+
+               @Override
+               public Matcher<GenericRecord> testDataMatcher() {
+                       GenericData.Record record = new 
GenericData.Record(Address.getClassSchema());
+                       record.put("num", 239);
+                       record.put("street", "Baker Street");
+                       record.put("city", "London");
+                       record.put("state", "London");
+                       record.put("zip", "NW1 6XE");
+                       return is(record);
+               }
+
+               @Override
+               public 
Matcher<TypeSerializerSchemaCompatibility<GenericRecord>> 
schemaCompatibilityMatcher(MigrationVersion version) {
+                       return TypeSerializerMatchers.isCompatibleAsIs();
+               }
+       }
+
+       // 
----------------------------------------------------------------------------------------------
+       //  Specification for "specific-avro-serializer"
+       // 
----------------------------------------------------------------------------------------------
+
+       /**
+        * This class is only public to work with {@link 
org.apache.flink.api.common.typeutils.ClassRelocator}.
+        */
+       public static final class SpecificAvroSerializerSetup implements 
TypeSerializerUpgradeTestBase.PreUpgradeSetup<Address> {
+
+               @Override
+               public TypeSerializer<Address> createPriorSerializer() {
+                       @SuppressWarnings({"unchecked", "rawtypes"})
+                       AvroSerializer<Address> avroSerializer = new 
AvroSerializer(Address.class);
+                       return avroSerializer;
+               }
+
+               @Override
+               public Address createTestData() {
+                       Address addr = new Address();
+                       addr.setNum(239);
+                       addr.setStreet("Baker Street");
+                       addr.setCity("London");
+                       addr.setState("London");
+                       addr.setZip("NW1 6XE");
+                       return addr;
+               }
+       }
+
+       /**
+        * This class is only public to work with {@link 
org.apache.flink.api.common.typeutils.ClassRelocator}.
+        */
+       public static final class SpecificAvroSerializerVerifier implements 
TypeSerializerUpgradeTestBase.UpgradeVerifier<Address> {
+
+               @Override
+               public TypeSerializer<Address> createUpgradedSerializer() {
+                       @SuppressWarnings({"unchecked", "rawtypes"})
+                       AvroSerializer<Address> avroSerializer = new 
AvroSerializer(Address.class);
+                       return avroSerializer;
+               }
+
+               @Override
+               public Matcher<Address> testDataMatcher() {
+                       Address addr = new Address();
+                       addr.setNum(239);
+                       addr.setStreet("Baker Street");
+                       addr.setCity("London");
+                       addr.setState("London");
+                       addr.setZip("NW1 6XE");
+                       return is(addr);
+               }
+
+               @Override
+               public Matcher<TypeSerializerSchemaCompatibility<Address>> 
schemaCompatibilityMatcher(MigrationVersion version) {
+                       return TypeSerializerMatchers.isCompatibleAsIs();
+               }
+       }
+}
diff --git 
a/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-generic-type-serializer-address-snapshot
 
b/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-generic-type-serializer-address-snapshot
deleted file mode 100644
index 57cbc42..0000000
Binary files 
a/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-generic-type-serializer-address-snapshot
 and /dev/null differ
diff --git 
a/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serialized-data
 
b/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serialized-data
deleted file mode 100644
index 23853cf..0000000
Binary files 
a/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serialized-data
 and /dev/null differ
diff --git 
a/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serializer-address-data
 
b/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serializer-address-data
deleted file mode 100644
index 74acf72..0000000
Binary files 
a/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serializer-address-data
 and /dev/null differ
diff --git 
a/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serializer-address-snapshot
 
b/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serializer-address-snapshot
deleted file mode 100644
index d68be81..0000000
Binary files 
a/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serializer-address-snapshot
 and /dev/null differ
diff --git 
a/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serializer-snapshot
 
b/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serializer-snapshot
deleted file mode 100644
index 1474300..0000000
Binary files 
a/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serializer-snapshot
 and /dev/null differ
diff --git 
a/flink-formats/flink-avro/src/test/resources/flink-1.7-avro-generic-type-serializer-address-snapshot
 
b/flink-formats/flink-avro/src/test/resources/flink-1.7-avro-generic-type-serializer-address-snapshot
deleted file mode 100644
index f27d2dc..0000000
Binary files 
a/flink-formats/flink-avro/src/test/resources/flink-1.7-avro-generic-type-serializer-address-snapshot
 and /dev/null differ
diff --git 
a/flink-formats/flink-avro/src/test/resources/flink-1.7-avro-type-serializer-address-data
 
b/flink-formats/flink-avro/src/test/resources/flink-1.7-avro-type-serializer-address-data
deleted file mode 100644
index 74acf72..0000000
Binary files 
a/flink-formats/flink-avro/src/test/resources/flink-1.7-avro-type-serializer-address-data
 and /dev/null differ
diff --git 
a/flink-formats/flink-avro/src/test/resources/flink-1.7-avro-type-serializer-address-snapshot
 
b/flink-formats/flink-avro/src/test/resources/flink-1.7-avro-type-serializer-address-snapshot
deleted file mode 100644
index 7c8f6c2..0000000
Binary files 
a/flink-formats/flink-avro/src/test/resources/flink-1.7-avro-type-serializer-address-snapshot
 and /dev/null differ
diff --git 
a/flink-formats/flink-avro/src/test/resources/flink_11-kryo_registrations 
b/flink-formats/flink-avro/src/test/resources/flink_11-kryo_registrations
deleted file mode 100644
index 7000e62..0000000
--- a/flink-formats/flink-avro/src/test/resources/flink_11-kryo_registrations
+++ /dev/null
@@ -1,86 +0,0 @@
-0,int
-1,java.lang.String
-2,float
-3,boolean
-4,byte
-5,char
-6,short
-7,long
-8,double
-9,void
-10,scala.collection.convert.Wrappers$SeqWrapper
-11,scala.collection.convert.Wrappers$IteratorWrapper
-12,scala.collection.convert.Wrappers$MapWrapper
-13,scala.collection.convert.Wrappers$JListWrapper
-14,scala.collection.convert.Wrappers$JMapWrapper
-15,scala.Some
-16,scala.util.Left
-17,scala.util.Right
-18,scala.collection.immutable.Vector
-19,scala.collection.immutable.Set$Set1
-20,scala.collection.immutable.Set$Set2
-21,scala.collection.immutable.Set$Set3
-22,scala.collection.immutable.Set$Set4
-23,scala.collection.immutable.HashSet$HashTrieSet
-24,scala.collection.immutable.Map$Map1
-25,scala.collection.immutable.Map$Map2
-26,scala.collection.immutable.Map$Map3
-27,scala.collection.immutable.Map$Map4
-28,scala.collection.immutable.HashMap$HashTrieMap
-29,scala.collection.immutable.Range$Inclusive
-30,scala.collection.immutable.NumericRange$Inclusive
-31,scala.collection.immutable.NumericRange$Exclusive
-32,scala.collection.mutable.BitSet
-33,scala.collection.mutable.HashMap
-34,scala.collection.mutable.HashSet
-35,scala.collection.convert.Wrappers$IterableWrapper
-36,scala.Tuple1
-37,scala.Tuple2
-38,scala.Tuple3
-39,scala.Tuple4
-40,scala.Tuple5
-41,scala.Tuple6
-42,scala.Tuple7
-43,scala.Tuple8
-44,scala.Tuple9
-45,scala.Tuple10
-46,scala.Tuple11
-47,scala.Tuple12
-48,scala.Tuple13
-49,scala.Tuple14
-50,scala.Tuple15
-51,scala.Tuple16
-52,scala.Tuple17
-53,scala.Tuple18
-54,scala.Tuple19
-55,scala.Tuple20
-56,scala.Tuple21
-57,scala.Tuple22
-58,scala.Tuple1$mcJ$sp
-59,scala.Tuple1$mcI$sp
-60,scala.Tuple1$mcD$sp
-61,scala.Tuple2$mcJJ$sp
-62,scala.Tuple2$mcJI$sp
-63,scala.Tuple2$mcJD$sp
-64,scala.Tuple2$mcIJ$sp
-65,scala.Tuple2$mcII$sp
-66,scala.Tuple2$mcID$sp
-67,scala.Tuple2$mcDJ$sp
-68,scala.Tuple2$mcDI$sp
-69,scala.Tuple2$mcDD$sp
-70,scala.Symbol
-71,scala.reflect.ClassTag
-72,scala.runtime.BoxedUnit
-73,java.util.Arrays$ArrayList
-74,java.util.BitSet
-75,java.util.PriorityQueue
-76,java.util.regex.Pattern
-77,java.sql.Date
-78,java.sql.Time
-79,java.sql.Timestamp
-80,java.net.URI
-81,java.net.InetSocketAddress
-82,java.util.UUID
-83,java.util.Locale
-84,java.text.SimpleDateFormat
-85,org.apache.avro.generic.GenericData$Array

Reply via email to