[ 
https://issues.apache.org/jira/browse/FLINK-10605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16668745#comment-16668745
 ] 

ASF GitHub Bot commented on FLINK-10605:
----------------------------------------

asfgit closed pull request #6881: [FLINK-10605] [core] Upgrade AvroSerializer 
snapshot to implement new TypeSerializerSnapshot interface
URL: https://github.com/apache/flink/pull/6881
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
deleted file mode 100644
index 03bacfab134..00000000000
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import org.apache.flink.annotation.Public;
-
-import org.apache.avro.specific.SpecificRecordBase;
-
-import static 
org.apache.flink.formats.avro.typeutils.AvroTypeInfo.generateFieldsFromAvroSchema;
-
-/**
- * @deprecated Please use 
<code>org.apache.flink.formats.avro.typeutils.AvroTypeInfo</code>
- * in the <code>flink-avro</code> module. This class will be removed in the 
near future.
- */
-@Deprecated
-@Public
-public class AvroTypeInfo<T extends SpecificRecordBase> extends 
PojoTypeInfo<T> {
-
-       public AvroTypeInfo(Class<T> typeClass) {
-               super(typeClass, generateFieldsFromAvroSchema(typeClass, true));
-       }
-}
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
deleted file mode 100644
index 5f76b094361..00000000000
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.formats.avro.utils.DataInputDecoder;
-import org.apache.flink.formats.avro.utils.DataOutputEncoder;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
-
-import com.esotericsoftware.kryo.Kryo;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.avro.util.Utf8;
-import org.objenesis.strategy.StdInstantiatorStrategy;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Old deprecated Avro serializer. It is retained for a smoother experience 
when
- * upgrading from an earlier Flink savepoint that stored this serializer.
- */
-@Internal
-@Deprecated
-@SuppressWarnings({"unused", "deprecation"})
-public final class AvroSerializer<T> extends TypeSerializer<T> {
-
-       private static final long serialVersionUID = 1L;
-
-       private final Class<T> type;
-
-       private final Class<? extends T> typeToInstantiate;
-
-       /**
-        * Map of class tag (using classname as tag) to their Kryo registration.
-        *
-        * <p>This map serves as a preview of the final registration result of
-        * the Kryo instance, taking into account registration overwrites.
-        */
-       private LinkedHashMap<String, KryoRegistration> kryoRegistrations;
-
-       private transient ReflectDatumWriter<T> writer;
-       private transient ReflectDatumReader<T> reader;
-
-       private transient DataOutputEncoder encoder;
-       private transient DataInputDecoder decoder;
-
-       private transient Kryo kryo;
-
-       private transient T deepCopyInstance;
-
-       // 
--------------------------------------------------------------------------------------------
-
-       public AvroSerializer(Class<T> type) {
-               this(type, type);
-       }
-
-       public AvroSerializer(Class<T> type, Class<? extends T> 
typeToInstantiate) {
-               this.type = checkNotNull(type);
-               this.typeToInstantiate = checkNotNull(typeToInstantiate);
-
-               InstantiationUtil.checkForInstantiation(typeToInstantiate);
-
-               this.kryoRegistrations = buildKryoRegistrations(type);
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public boolean isImmutableType() {
-               return false;
-       }
-
-       @Override
-       public AvroSerializer<T> duplicate() {
-               return new AvroSerializer<>(type, typeToInstantiate);
-       }
-
-       @Override
-       public T createInstance() {
-               return InstantiationUtil.instantiate(this.typeToInstantiate);
-       }
-
-       @Override
-       public T copy(T from) {
-               checkKryoInitialized();
-
-               return KryoUtils.copy(from, kryo, this);
-       }
-
-       @Override
-       public T copy(T from, T reuse) {
-               checkKryoInitialized();
-
-               return KryoUtils.copy(from, reuse, kryo, this);
-       }
-
-       @Override
-       public int getLength() {
-               return -1;
-       }
-
-       @Override
-       public void serialize(T value, DataOutputView target) throws 
IOException {
-               checkAvroInitialized();
-               this.encoder.setOut(target);
-               this.writer.write(value, this.encoder);
-       }
-
-       @Override
-       public T deserialize(DataInputView source) throws IOException {
-               checkAvroInitialized();
-               this.decoder.setIn(source);
-               return this.reader.read(null, this.decoder);
-       }
-
-       @Override
-       public T deserialize(T reuse, DataInputView source) throws IOException {
-               checkAvroInitialized();
-               this.decoder.setIn(source);
-               return this.reader.read(reuse, this.decoder);
-       }
-
-       @Override
-       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-               checkAvroInitialized();
-
-               if (this.deepCopyInstance == null) {
-                       this.deepCopyInstance = 
InstantiationUtil.instantiate(type, Object.class);
-               }
-
-               this.decoder.setIn(source);
-               this.encoder.setOut(target);
-
-               T tmp = this.reader.read(this.deepCopyInstance, this.decoder);
-               this.writer.write(tmp, this.encoder);
-       }
-
-       private void checkAvroInitialized() {
-               if (this.reader == null) {
-                       this.reader = new ReflectDatumReader<>(type);
-                       this.writer = new ReflectDatumWriter<>(type);
-                       this.encoder = new DataOutputEncoder();
-                       this.decoder = new DataInputDecoder();
-               }
-       }
-
-       private void checkKryoInitialized() {
-               if (this.kryo == null) {
-                       this.kryo = new Kryo();
-
-                       Kryo.DefaultInstantiatorStrategy instantiatorStrategy = 
new Kryo.DefaultInstantiatorStrategy();
-                       
instantiatorStrategy.setFallbackInstantiatorStrategy(new 
StdInstantiatorStrategy());
-                       kryo.setInstantiatorStrategy(instantiatorStrategy);
-
-                       kryo.setAsmEnabled(true);
-
-                       KryoUtils.applyRegistrations(kryo, 
kryoRegistrations.values());
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public int hashCode() {
-               return 31 * this.type.hashCode() + 
this.typeToInstantiate.hashCode();
-       }
-
-       @Override
-       public boolean equals(Object obj) {
-               if (obj instanceof AvroSerializer) {
-                       @SuppressWarnings("unchecked")
-                       AvroSerializer<T> avroSerializer = (AvroSerializer<T>) 
obj;
-
-                       return avroSerializer.canEqual(this) &&
-                                       type == avroSerializer.type &&
-                                       typeToInstantiate == 
avroSerializer.typeToInstantiate;
-               } else {
-                       return false;
-               }
-       }
-
-       @Override
-       public boolean canEqual(Object obj) {
-               return obj instanceof AvroSerializer;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // Serializer configuration snapshotting & compatibility
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public AvroSerializerConfigSnapshot<T> snapshotConfiguration() {
-               return new AvroSerializerConfigSnapshot<>(type, 
typeToInstantiate, kryoRegistrations);
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
-               if (configSnapshot instanceof AvroSerializerConfigSnapshot) {
-                       final AvroSerializerConfigSnapshot<T> config = 
(AvroSerializerConfigSnapshot<T>) configSnapshot;
-
-                       if (type.equals(config.getTypeClass()) && 
typeToInstantiate.equals(config.getTypeToInstantiate())) {
-                               // resolve Kryo registrations; currently, since 
the Kryo registrations in Avro
-                               // are fixed, there shouldn't be a problem with 
the resolution here.
-
-                               LinkedHashMap<String, KryoRegistration> 
oldRegistrations = config.getKryoRegistrations();
-                               oldRegistrations.putAll(kryoRegistrations);
-
-                               for (Map.Entry<String, KryoRegistration> 
reconfiguredRegistrationEntry : kryoRegistrations.entrySet()) {
-                                       if 
(reconfiguredRegistrationEntry.getValue().isDummy()) {
-                                               return 
CompatibilityResult.requiresMigration();
-                                       }
-                               }
-
-                               this.kryoRegistrations = oldRegistrations;
-                               return CompatibilityResult.compatible();
-                       }
-               }
-
-               // ends up here if the preceding serializer is not
-               // the ValueSerializer, or serialized data type has changed
-               return CompatibilityResult.requiresMigration();
-       }
-
-       /**
-        * Config snapshot for this serializer.
-        */
-       public static class AvroSerializerConfigSnapshot<T> extends 
KryoRegistrationSerializerConfigSnapshot<T> {
-
-               private static final int VERSION = 1;
-
-               private Class<? extends T> typeToInstantiate;
-
-               public AvroSerializerConfigSnapshot() {}
-
-               public AvroSerializerConfigSnapshot(
-                               Class<T> baseType,
-                               Class<? extends T> typeToInstantiate,
-                               LinkedHashMap<String, KryoRegistration> 
kryoRegistrations) {
-
-                       super(baseType, kryoRegistrations);
-                       this.typeToInstantiate = 
Preconditions.checkNotNull(typeToInstantiate);
-               }
-
-               @Override
-               public void write(DataOutputView out) throws IOException {
-                       super.write(out);
-
-                       out.writeUTF(typeToInstantiate.getName());
-               }
-
-               @SuppressWarnings("unchecked")
-               @Override
-               public void read(DataInputView in) throws IOException {
-                       super.read(in);
-
-                       String classname = in.readUTF();
-                       try {
-                               typeToInstantiate = (Class<? extends T>) 
Class.forName(classname, true, getUserCodeClassLoader());
-                       } catch (ClassNotFoundException e) {
-                               throw new IOException("Cannot find requested 
class " + classname + " in classpath.", e);
-                       }
-               }
-
-               @Override
-               public int getVersion() {
-                       return VERSION;
-               }
-
-               public Class<? extends T> getTypeToInstantiate() {
-                       return typeToInstantiate;
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
-       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-               in.defaultReadObject();
-
-               // kryoRegistrations may be null if this Avro serializer is 
deserialized from an old version
-               if (kryoRegistrations == null) {
-                       this.kryoRegistrations = buildKryoRegistrations(type);
-               }
-       }
-
-       private static <T> LinkedHashMap<String, KryoRegistration> 
buildKryoRegistrations(Class<T> serializedDataType) {
-               final LinkedHashMap<String, KryoRegistration> registrations = 
new LinkedHashMap<>();
-
-               // register Avro types.
-               registrations.put(
-                               GenericData.Array.class.getName(),
-                               new KryoRegistration(
-                                               GenericData.Array.class,
-                                               new 
ExecutionConfig.SerializableSerializer<>(new 
Serializers.SpecificInstanceCollectionSerializerForArrayList())));
-               registrations.put(Utf8.class.getName(), new 
KryoRegistration(Utf8.class));
-               registrations.put(GenericData.EnumSymbol.class.getName(), new 
KryoRegistration(GenericData.EnumSymbol.class));
-               registrations.put(GenericData.Fixed.class.getName(), new 
KryoRegistration(GenericData.Fixed.class));
-               registrations.put(GenericData.StringType.class.getName(), new 
KryoRegistration(GenericData.StringType.class));
-
-               // register the serialized data type
-               registrations.put(serializedDataType.getName(), new 
KryoRegistration(serializedDataType));
-
-               return registrations;
-       }
-}
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
index 2d06476d1c5..a9bdcee8bb5 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
@@ -164,7 +164,7 @@ public boolean isEndOfStream(T nextElement) {
        @SuppressWarnings("unchecked")
        public TypeInformation<T> getProducedType() {
                if (SpecificRecord.class.isAssignableFrom(recordClazz)) {
-                       return new AvroTypeInfo(recordClazz, false);
+                       return new AvroTypeInfo(recordClazz);
                } else {
                        return (TypeInformation<T>) new 
GenericRecordAvroTypeInfo(this.reader);
                }
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java
new file mode 100644
index 00000000000..0ca25bfaa04
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.formats.avro.utils.DataInputDecoder;
+import org.apache.flink.formats.avro.utils.DataOutputEncoder;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.reflect.Nullable;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
+
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Creates Avro {@link DatumReader} and {@link DatumWriter}.
+ *
+ * @param <T> The type to be serialized.
+ */
+@Internal
+final class AvroFactory<T> {
+
+       private final DataOutputEncoder encoder = new DataOutputEncoder();
+       private final DataInputDecoder decoder = new DataInputDecoder();
+
+       private final GenericData avroData;
+       private final Schema schema;
+       private final DatumWriter<T> writer;
+       private final DatumReader<T> reader;
+
+       /**
+        * Creates Avro Writer and Reader for a specific type.
+        *
+        * <p>Given an input type, and possible the current schema, and a 
previously known schema (also known as writer
+        * schema) create will deduce the best way to initalize a reader and 
writer according to the following rules:
+        * <ul>
+        * <li>If type is an Avro generated class (an {@link SpecificRecord} 
then the reader would use the
+        * previousSchema for reading (if present) otherwise it would use the 
schema attached to the auto generated
+        * class.
+        * <li>If the type is a GenericRecord then the reader and the writer 
would be created with the supplied
+        * (mandatory) schema.
+        * <li>Otherwise, we use Avro's reflection based reader and writer that 
would deduce the schema via reflection.
+        * If the previous schema is also present (when restoring a serializer 
for example) then the reader would be
+        * created with both schemas.
+        * </ul>
+        */
+       static <T> AvroFactory<T> create(Class<T> type, @Nullable Schema 
currentSchema, @Nullable Schema previousSchema) {
+               final ClassLoader cl = 
Thread.currentThread().getContextClassLoader();
+
+               if (SpecificRecord.class.isAssignableFrom(type)) {
+                       return fromSpecific(type, cl, 
Optional.ofNullable(previousSchema));
+               }
+               if (GenericRecord.class.isAssignableFrom(type)) {
+                       return fromGeneric(cl, currentSchema);
+               }
+               return fromReflective(type, cl, 
Optional.ofNullable(previousSchema));
+       }
+
+       static <T> AvroFactory<T> createFromTypeAndSchemaString(Class<T> type, 
@Nullable String schemaString) {
+               Schema schema = (schemaString != null) ? new 
Schema.Parser().parse(schemaString) : null;
+               return create(type, schema, null);
+       }
+
+       @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
+       private static <T> AvroFactory<T> fromSpecific(Class<T> type, 
ClassLoader cl, Optional<Schema> previousSchema) {
+               SpecificData specificData = new SpecificData(cl);
+               Schema newSchema = specificData.getSchema(type);
+
+               return new AvroFactory<>(
+                       specificData,
+                       newSchema,
+                       new 
SpecificDatumReader<>(previousSchema.orElse(newSchema), newSchema, 
specificData),
+                       new SpecificDatumWriter<>(newSchema, specificData)
+               );
+       }
+
+       private static <T> AvroFactory<T> fromGeneric(ClassLoader cl, Schema 
schema) {
+               checkNotNull(schema,
+                       "Unable to create an AvroSerializer with a 
GenericRecord type without a schema");
+               GenericData genericData = new GenericData(cl);
+
+               return new AvroFactory<>(
+                       genericData,
+                       schema,
+                       new GenericDatumReader<>(schema, schema, genericData),
+                       new GenericDatumWriter<>(schema, genericData)
+               );
+       }
+
+       @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
+       private static <T> AvroFactory<T> fromReflective(Class<T> type, 
ClassLoader cl, Optional<Schema> previousSchema) {
+               ReflectData reflectData = new ReflectData(cl);
+               Schema newSchema = reflectData.getSchema(type);
+
+               return new AvroFactory<>(
+                       reflectData,
+                       newSchema,
+                       new 
ReflectDatumReader<>(previousSchema.orElse(newSchema), newSchema, reflectData),
+                       new ReflectDatumWriter<>(newSchema, reflectData)
+               );
+       }
+
+       private AvroFactory(
+               GenericData avroData,
+               Schema schema,
+               DatumReader<T> reader,
+               DatumWriter<T> writer) {
+
+               this.avroData = checkNotNull(avroData);
+               this.schema = checkNotNull(schema);
+               this.writer = checkNotNull(writer);
+               this.reader = checkNotNull(reader);
+       }
+
+       DataOutputEncoder getEncoder() {
+               return encoder;
+       }
+
+       DataInputDecoder getDecoder() {
+               return decoder;
+       }
+
+       Schema getSchema() {
+               return schema;
+       }
+
+       DatumWriter<T> getWriter() {
+               return writer;
+       }
+
+       DatumReader<T> getReader() {
+               return reader;
+       }
+
+       GenericData getAvroData() {
+               return avroData;
+       }
+}
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
index dad1d6df16a..37aa2d3bc44 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
@@ -18,10 +18,11 @@
 
 package org.apache.flink.formats.avro.typeutils;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import 
org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.formats.avro.utils.DataInputDecoder;
@@ -33,15 +34,10 @@
 import org.apache.avro.SchemaCompatibility.SchemaCompatibilityType;
 import org.apache.avro.SchemaCompatibility.SchemaPairCompatibility;
 import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.avro.specific.SpecificData;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.reflect.Nullable;
 import org.apache.avro.specific.SpecificRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -78,29 +74,34 @@
         * Because this flag is static final, a value of 'false' allows the JIT 
compiler to eliminate
         * the guarded code sections. */
        private static final boolean CONCURRENT_ACCESS_CHECK =
-                       LOG.isDebugEnabled() || 
AvroSerializerDebugInitHelper.setToDebug;
+               LOG.isDebugEnabled() || 
AvroSerializerDebugInitHelper.setToDebug;
 
        // -------- configuration fields, serializable -----------
 
        /** The class of the type that is serialized by this serializer. */
        private final Class<T> type;
+       private final SerializableAvroSchema schema;
+       private final SerializableAvroSchema previousSchema;
 
-       private final String schemaString;
+       /** This field was present in this class prior to 1.7, and held the 
string representation of
+        * a {@link Schema} (only in the case of an Avro GenericRecord). Since, 
{@code FsStateBackend} stores the serializer
+        * (via Java serialization) within the checkpoint to later restore the 
state, we need to have this field here.
+        * see {@link #initializeAvro()}.
+        */
+       @Deprecated
+       private final String schemaString = null;
 
        // -------- runtime fields, non-serializable, lazily initialized 
-----------
 
-       private transient GenericDatumWriter<T> writer;
-       private transient GenericDatumReader<T> reader;
-
+       private transient GenericData avroData;
+       private transient DatumWriter<T> writer;
        private transient DataOutputEncoder encoder;
        private transient DataInputDecoder decoder;
-
-       private transient GenericData avroData;
-
-       private transient Schema schema;
+       private transient DatumReader<T> reader;
+       private transient Schema runtimeSchema;
 
        /** The serializer configuration snapshot, cached for efficiency. */
-       private transient AvroSchemaSerializerConfigSnapshot<T> configSnapshot;
+       private transient TypeSerializerSnapshot<T> configSnapshot;
 
        /** The currently accessing thread, set and checked on debug level 
only. */
        private transient volatile Thread currentThread;
@@ -113,10 +114,9 @@
         * For serializing {@link GenericData.Record} use {@link 
AvroSerializer#AvroSerializer(Class, Schema)}
         */
        public AvroSerializer(Class<T> type) {
+               this(checkNotNull(type), new SerializableAvroSchema(), new 
SerializableAvroSchema());
                checkArgument(!isGenericRecord(type),
                        "For GenericData.Record use constructor with explicit 
schema.");
-               this.type = checkNotNull(type);
-               this.schemaString = null;
        }
 
        /**
@@ -126,11 +126,19 @@ public AvroSerializer(Class<T> type) {
         * {@link AvroSerializer#AvroSerializer(Class)}
         */
        public AvroSerializer(Class<T> type, Schema schema) {
+               this(checkNotNull(type), new 
SerializableAvroSchema(checkNotNull(schema)), new SerializableAvroSchema());
                checkArgument(isGenericRecord(type),
                        "For classes other than GenericData.Record use 
constructor without explicit schema.");
+       }
+
+       /**
+        * Creates a new AvroSerializer for the type indicated by the given 
class.
+        */
+       @Internal
+       AvroSerializer(Class<T> type, @Nullable SerializableAvroSchema 
newSchema, @Nullable SerializableAvroSchema previousSchema) {
                this.type = checkNotNull(type);
-               this.schema = checkNotNull(schema);
-               this.schemaString = schema.toString();
+               this.schema = newSchema;
+               this.previousSchema = previousSchema;
        }
 
        /**
@@ -167,7 +175,7 @@ public int getLength() {
        // 
------------------------------------------------------------------------
 
        @Override
-       public T createInstance() {
+       public T createInstance()  {
                return InstantiationUtil.instantiate(type);
        }
 
@@ -237,7 +245,7 @@ public T copy(T from) {
 
                try {
                        checkAvroInitialized();
-                       return avroData.deepCopy(schema, from);
+                       return avroData.deepCopy(runtimeSchema, from);
                }
                finally {
                        if (CONCURRENT_ACCESS_CHECK) {
@@ -264,10 +272,10 @@ public void copy(DataInputView source, DataOutputView 
target) throws IOException
        // 
------------------------------------------------------------------------
 
        @Override
-       public TypeSerializerConfigSnapshot<T> snapshotConfiguration() {
+       public TypeSerializerSnapshot<T> snapshotConfiguration() {
                if (configSnapshot == null) {
                        checkAvroInitialized();
-                       configSnapshot = new 
AvroSchemaSerializerConfigSnapshot<>(schema.toString(false));
+                       configSnapshot = new 
AvroSerializerSnapshot<>(runtimeSchema, type);
                }
                return configSnapshot;
        }
@@ -282,18 +290,10 @@ public void copy(DataInputView source, DataOutputView 
target) throws IOException
 
                        checkAvroInitialized();
                        final SchemaPairCompatibility compatibility =
-                                       
SchemaCompatibility.checkReaderWriterCompatibility(schema, lastSchema);
+                               
SchemaCompatibility.checkReaderWriterCompatibility(runtimeSchema, lastSchema);
 
                        return compatibility.getType() == 
SchemaCompatibilityType.COMPATIBLE ?
-                                       CompatibilityResult.compatible() : 
CompatibilityResult.requiresMigration();
-               }
-               else if (configSnapshot instanceof 
AvroSerializerConfigSnapshot) {
-                       // old snapshot case, just compare the type
-                       // we don't need to restore any Kryo stuff, since Kryo 
was never used for persistence,
-                       // only for object-to-object copies.
-                       final AvroSerializerConfigSnapshot<T> old = 
(AvroSerializerConfigSnapshot<T>) configSnapshot;
-                       return type.equals(old.getTypeClass()) ?
-                                       CompatibilityResult.compatible() : 
CompatibilityResult.requiresMigration();
+                               CompatibilityResult.compatible() : 
CompatibilityResult.requiresMigration();
                }
                else {
                        return CompatibilityResult.requiresMigration();
@@ -304,19 +304,15 @@ else if (configSnapshot instanceof 
AvroSerializerConfigSnapshot) {
        //  Utilities
        // 
------------------------------------------------------------------------
 
-       private static boolean isGenericRecord(Class<?> type) {
+       static boolean isGenericRecord(Class<?> type) {
                return !SpecificRecord.class.isAssignableFrom(type) &&
                        GenericRecord.class.isAssignableFrom(type);
        }
 
        @Override
        public TypeSerializer<T> duplicate() {
-               if (schemaString != null) {
-                       return new AvroSerializer<>(type, schema);
-               } else {
-                       return new AvroSerializer<>(type);
-
-               }
+               checkAvroInitialized();
+               return new AvroSerializer<>(type, new 
SerializableAvroSchema(runtimeSchema), previousSchema);
        }
 
        @Override
@@ -359,32 +355,26 @@ private void checkAvroInitialized() {
        }
 
        private void initializeAvro() {
-               final ClassLoader cl = 
Thread.currentThread().getContextClassLoader();
-
-               if (SpecificRecord.class.isAssignableFrom(type)) {
-                       SpecificData specificData = new SpecificData(cl);
-                       this.avroData = specificData;
-                       this.schema = specificData.getSchema(type);
-                       this.reader = new SpecificDatumReader<>(schema, schema, 
specificData);
-                       this.writer = new SpecificDatumWriter<>(schema, 
specificData);
-               } else if (GenericRecord.class.isAssignableFrom(type)) {
-                       if (schema == null) {
-                               this.schema = new 
Schema.Parser().parse(schemaString);
-                       }
-                       GenericData genericData = new GenericData(cl);
-                       this.avroData = genericData;
-                       this.reader = new GenericDatumReader<>(schema, schema, 
genericData);
-                       this.writer = new GenericDatumWriter<>(schema, 
genericData);
-               } else {
-                       final ReflectData reflectData = new ReflectData(cl);
-                       this.avroData = reflectData;
-                       this.schema = reflectData.getSchema(type);
-                       this.reader = new ReflectDatumReader<>(schema, schema, 
reflectData);
-                       this.writer = new ReflectDatumWriter<>(schema, 
reflectData);
-               }
-
-               this.encoder = new DataOutputEncoder();
-               this.decoder = new DataInputDecoder();
+               final AvroFactory<T> factory;
+               if (wasThisInstanceDeserializedFromAPre17Version()) {
+                       // since schema is a final field that is initialized to 
a non null value,
+                       // this can only have happened when restoring from a 
checkpoint in an FsStateBackend pre Flink 1.7.
+                       // To maintain backwards compatibility we need to use 
the information stored at schemaString.
+                       factory = 
AvroFactory.createFromTypeAndSchemaString(type, schemaString);
+               }
+               else {
+                       factory = AvroFactory.create(type, 
schema.getAvroSchema(), previousSchema.getAvroSchema());
+               }
+               this.runtimeSchema = factory.getSchema();
+               this.writer = factory.getWriter();
+               this.reader = factory.getReader();
+               this.encoder = factory.getEncoder();
+               this.decoder = factory.getDecoder();
+               this.avroData = factory.getAvroData();
+       }
+
+       private boolean wasThisInstanceDeserializedFromAPre17Version() {
+               return (schema == null);
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -403,8 +393,8 @@ private void enterExclusiveThread() {
                }
                else if (previous != thisThread) {
                        throw new IllegalStateException(
-                                       "Concurrent access to KryoSerializer. 
Thread 1: " + thisThread.getName() +
-                                                       " , Thread 2: " + 
previous.getName());
+                               "Concurrent access to KryoSerializer. Thread 1: 
" + thisThread.getName() +
+                                       " , Thread 2: " + previous.getName());
                }
        }
 
@@ -412,13 +402,20 @@ private void exitExclusiveThread() {
                currentThread = null;
        }
 
+       Schema getAvroSchema() {
+               checkAvroInitialized();
+               return runtimeSchema;
+       }
+
        // 
------------------------------------------------------------------------
        //  Serializer Snapshots
        // 
------------------------------------------------------------------------
 
        /**
         * A config snapshot for the Avro Serializer that stores the Avro 
Schema to check compatibility.
+        * This class is now deprecated and only kept for backward 
comparability.
         */
+       @Deprecated
        public static final class AvroSchemaSerializerConfigSnapshot<T> extends 
TypeSerializerConfigSnapshot<T> {
 
                private String schemaString;
@@ -427,8 +424,13 @@ private void exitExclusiveThread() {
                 * Default constructor for instantiation via reflection.
                 */
                @SuppressWarnings("unused")
-               public AvroSchemaSerializerConfigSnapshot() {}
+               public AvroSchemaSerializerConfigSnapshot() {
+               }
 
+               /**
+                * AvroSerializer now uses the new {@link 
AvroSerializerSnapshot} class instead.
+                */
+               @SuppressWarnings("unused")
                public AvroSchemaSerializerConfigSnapshot(String schemaString) {
                        this.schemaString = checkNotNull(schemaString);
                }
@@ -485,47 +487,4 @@ public String toString() {
                }
        }
 
-       /**
-        * The outdated config snapshot, retained for backwards compatibility.
-        *
-        * @deprecated The {@link AvroSchemaSerializerConfigSnapshot} should be 
used instead.
-        */
-       @Deprecated
-       public static class AvroSerializerConfigSnapshot<T> extends 
KryoRegistrationSerializerConfigSnapshot<T> {
-
-               private static final int VERSION = 1;
-
-               private Class<? extends T> typeToInstantiate;
-
-               public AvroSerializerConfigSnapshot() {}
-
-               @Override
-               public void write(DataOutputView out) throws IOException {
-                       super.write(out);
-
-                       out.writeUTF(typeToInstantiate.getName());
-               }
-
-               @SuppressWarnings("unchecked")
-               @Override
-               public void read(DataInputView in) throws IOException {
-                       super.read(in);
-
-                       String classname = in.readUTF();
-                       try {
-                               typeToInstantiate = (Class<? extends T>) 
Class.forName(classname, true, getUserCodeClassLoader());
-                       } catch (ClassNotFoundException e) {
-                               throw new IOException("Cannot find requested 
class " + classname + " in classpath.", e);
-                       }
-               }
-
-               @Override
-               public int getVersion() {
-                       return VERSION;
-               }
-
-               public Class<? extends T> getTypeToInstantiate() {
-                       return typeToInstantiate;
-               }
-       }
 }
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshot.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshot.java
new file mode 100644
index 00000000000..cb549ebd98e
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshot.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaCompatibility;
+import org.apache.avro.SchemaCompatibility.SchemaPairCompatibility;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import static 
org.apache.flink.formats.avro.typeutils.AvroSerializer.isGenericRecord;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An {@code Avro} specific implementation of a {@link TypeSerializerSnapshot}.
+ *
+ * @param <T> The data type that the originating serializer of this 
configuration serializes.
+ */
+public final class AvroSerializerSnapshot<T> implements 
TypeSerializerSnapshot<T> {
+       private Class<T> runtimeType;
+       private Schema schema;
+       private Schema runtimeSchema;
+
+       @SuppressWarnings("WeakerAccess")
+       public AvroSerializerSnapshot() {
+               // this constructor is used when restoring from a checkpoint.
+       }
+
+       AvroSerializerSnapshot(Schema schema, Class<T> runtimeType) {
+               this.schema = schema;
+               this.runtimeType = runtimeType;
+       }
+
+       @Override
+       public int getCurrentVersion() {
+               return 1;
+       }
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               checkNotNull(runtimeType);
+               checkNotNull(schema);
+
+               out.writeUTF(runtimeType.getName());
+               out.writeUTF(schema.toString(false));
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public void read(int readVersion, DataInputView in, ClassLoader 
userCodeClassLoader) throws IOException {
+               final String previousRuntimeTypeName = in.readUTF();
+               final String previousSchemaDefinition = in.readUTF();
+
+               this.runtimeType = findClassOrThrow(userCodeClassLoader, 
previousRuntimeTypeName);
+               this.schema = parseAvroSchema(previousSchemaDefinition);
+               this.runtimeSchema = tryExtractAvroSchema(userCodeClassLoader, 
runtimeType);
+       }
+
+       @Override
+       public <NS extends TypeSerializer<T>> 
TypeSerializerSchemaCompatibility<T, NS>
+       resolveSchemaCompatibility(NS newSerializer) {
+               if (!(newSerializer instanceof AvroSerializer)) {
+                       return TypeSerializerSchemaCompatibility.incompatible();
+               }
+               AvroSerializer<?> newAvroSerializer = (AvroSerializer<?>) 
newSerializer;
+               return resolveSchemaCompatibility(schema, 
newAvroSerializer.getAvroSchema());
+       }
+
+       @Override
+       public TypeSerializer<T> restoreSerializer() {
+               checkNotNull(runtimeType);
+               checkNotNull(schema);
+
+               if (runtimeSchema != null) {
+                       return new AvroSerializer<>(runtimeType, new 
SerializableAvroSchema(runtimeSchema), new SerializableAvroSchema(schema));
+               }
+               else {
+                       return new AvroSerializer<>(runtimeType, new 
SerializableAvroSchema(schema), new SerializableAvroSchema(schema));
+               }
+       }
+
+       // 
------------------------------------------------------------------------------------------------------------
+       // Helpers
+       // 
------------------------------------------------------------------------------------------------------------
+
+       /**
+        * Resolves writer/reader schema compatibly.
+        *
+        * <p>Checks whenever a new version of a schema (reader) can read 
values serialized with the old schema (writer).
+        * If the schemas are compatible according to {@code Avro} schema 
resolution rules
+        * (@see <a 
href="https://avro.apache.org/docs/current/spec.html#Schema+Resolution";>Schema 
Resolution</a>).
+        */
+       @VisibleForTesting
+       static <T, NS extends TypeSerializer<T>> 
TypeSerializerSchemaCompatibility<T, NS> resolveSchemaCompatibility(
+               Schema writerSchema,
+               Schema readerSchema) {
+
+               if (Objects.equals(writerSchema, readerSchema)) {
+                       return 
TypeSerializerSchemaCompatibility.compatibleAsIs();
+               }
+
+               final SchemaPairCompatibility compatibility =
+                       
SchemaCompatibility.checkReaderWriterCompatibility(readerSchema, writerSchema);
+
+               return avroCompatibilityToFlinkCompatibility(compatibility);
+       }
+
+       private static <T, NS extends TypeSerializer<T>> 
TypeSerializerSchemaCompatibility<T, NS>
+       avroCompatibilityToFlinkCompatibility(SchemaPairCompatibility 
compatibility) {
+               switch (compatibility.getType()) {
+                       case COMPATIBLE: {
+                               // The new serializer would be able to read 
data persisted with *this* serializer, therefore no migration
+                               // is required.
+                               return 
TypeSerializerSchemaCompatibility.compatibleAsIs();
+                       }
+                       case INCOMPATIBLE: {
+                               return 
TypeSerializerSchemaCompatibility.incompatible();
+                       }
+                       case RECURSION_IN_PROGRESS:
+                       default:
+                               return 
TypeSerializerSchemaCompatibility.incompatible();
+               }
+       }
+
+       private static Schema parseAvroSchema(String previousSchemaDefinition) {
+               Schema.Parser parser = new Schema.Parser();
+               return parser.parse(previousSchemaDefinition);
+       }
+
+       private static Schema tryExtractAvroSchema(ClassLoader cl, Class<?> 
runtimeType) {
+               if (isGenericRecord(runtimeType)) {
+                       return null;
+               }
+               if (isSpecificRecord(runtimeType)) {
+                       SpecificData d = new SpecificData(cl);
+                       return d.getSchema(runtimeType);
+               }
+               ReflectData d = new ReflectData(cl);
+               return d.getSchema(runtimeType);
+       }
+
+       @SuppressWarnings("unchecked")
+       private static <T> Class<T> findClassOrThrow(ClassLoader 
userCodeClassLoader, String className) {
+               try {
+                       Class<?> runtimeTarget = Class.forName(className, 
false, userCodeClassLoader);
+                       return (Class<T>) runtimeTarget;
+               }
+               catch (ClassNotFoundException e) {
+                       throw new IllegalStateException(""
+                               + "Unable to find the class '" + className + "' 
which is used to deserialize "
+                               + "the elements of this serializer. "
+                               + "Were the class was moved or renamed?", e);
+               }
+       }
+
+       private static boolean isSpecificRecord(Class<?> runtimeType) {
+               return SpecificRecord.class.isAssignableFrom(runtimeType);
+       }
+}
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
index 644ee50d361..09ce1868707 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
@@ -33,7 +33,6 @@
 import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Special type information to generate a special AvroTypeInfo for Avro POJOs 
(implementing SpecificRecordBase, the typed Avro POJOs)
@@ -50,52 +49,22 @@
 
        private static final long serialVersionUID = 1L;
 
-       private static final ConcurrentHashMap<Thread, Boolean> 
IN_BACKWARDS_COMPATIBLE_MODE = new ConcurrentHashMap<>();
-
-       private final boolean useBackwardsCompatibleSerializer;
-
        /**
         * Creates a new Avro type info for the given class.
         */
        public AvroTypeInfo(Class<T> typeClass) {
-               this(typeClass, false);
-       }
-
-       /**
-        * Creates a new Avro type info for the given class.
-        *
-        * <p>This constructor takes a flag to specify whether a serializer
-        * that is backwards compatible with PoJo-style serialization of Avro 
types should be used.
-        * That is only necessary, if one has a Flink 1.3 (or earlier) 
savepoint where Avro types
-        * were stored in the checkpointed state. New Flink programs will never 
need this.
-        */
-       public AvroTypeInfo(Class<T> typeClass, boolean 
useBackwardsCompatibleSerializer) {
-               super(typeClass, generateFieldsFromAvroSchema(typeClass, 
useBackwardsCompatibleSerializer));
-
-               final Boolean modeOnStack = 
IN_BACKWARDS_COMPATIBLE_MODE.get(Thread.currentThread());
-               this.useBackwardsCompatibleSerializer = modeOnStack == null ?
-                               useBackwardsCompatibleSerializer : modeOnStack;
+               super(typeClass, generateFieldsFromAvroSchema(typeClass));
        }
 
        @Override
        @SuppressWarnings("deprecation")
        public TypeSerializer<T> createSerializer(ExecutionConfig config) {
-               return useBackwardsCompatibleSerializer ?
-                               new 
BackwardsCompatibleAvroSerializer<>(getTypeClass()) :
-                               new AvroSerializer<>(getTypeClass());
+               return new AvroSerializer<>(getTypeClass());
        }
 
        @SuppressWarnings("unchecked")
        @Internal
-       public static <T extends SpecificRecordBase> List<PojoField> 
generateFieldsFromAvroSchema(
-                       Class<T> typeClass,
-                       boolean useBackwardsCompatibleSerializer) {
-
-               final Thread currentThread = Thread.currentThread();
-               final boolean entryPoint =
-                               
IN_BACKWARDS_COMPATIBLE_MODE.putIfAbsent(currentThread, 
useBackwardsCompatibleSerializer) == null;
-
-               try {
+       private static <T extends SpecificRecordBase> List<PojoField> 
generateFieldsFromAvroSchema(Class<T> typeClass) {
                        PojoTypeExtractor pte = new PojoTypeExtractor();
                        ArrayList<Type> typeHierarchy = new ArrayList<>();
                        typeHierarchy.add(typeClass);
@@ -121,12 +90,6 @@ public AvroTypeInfo(Class<T> typeClass, boolean 
useBackwardsCompatibleSerializer
                                newFields.add(newField);
                        }
                        return newFields;
-               }
-               finally {
-                       if (entryPoint) {
-                               
IN_BACKWARDS_COMPATIBLE_MODE.remove(currentThread);
-                       }
-               }
        }
 
        private static class PojoTypeExtractor extends TypeExtractor {
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java
deleted file mode 100644
index 63b79b2d064..00000000000
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.formats.avro.typeutils;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import 
org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
-import 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot;
-import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import 
org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSchemaSerializerConfigSnapshot;
-import 
org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSerializerConfigSnapshot;
-
-import org.apache.avro.specific.SpecificRecordBase;
-
-import java.io.IOException;
-
-import static org.apache.flink.util.Preconditions.checkState;
-
-/**
- * An Avro serializer that can switch back to a KryoSerializer or a Pojo 
Serializer, if
- * it has to ensure compatibility with one of those.
- *
- * <p>This serializer is there only as a means to explicitly fall back to PoJo 
serialization
- * in the case where an upgrade from an earlier savepoint was made.
- *
- * @param <T> The type to be serialized.
- */
-@SuppressWarnings("deprecation")
-public class BackwardsCompatibleAvroSerializer<T> extends TypeSerializer<T> {
-
-       private static final long serialVersionUID = 1L;
-
-       /** The type to serialize. */
-       private final Class<T> type;
-
-       /** The type serializer currently used. Avro by default. */
-       private TypeSerializer<T> serializer;
-
-       /**
-        * Creates a new backwards-compatible Avro Serializer, for the given 
type.
-        */
-       public BackwardsCompatibleAvroSerializer(Class<T> type) {
-               this.type = type;
-               this.serializer = new AvroSerializer<>(type);
-       }
-
-       /**
-        * Private copy constructor.
-        */
-       private BackwardsCompatibleAvroSerializer(Class<T> type, 
TypeSerializer<T> serializer) {
-               this.type = type;
-               this.serializer = serializer;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Properties
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public boolean isImmutableType() {
-               return serializer.isImmutableType();
-       }
-
-       @Override
-       public int getLength() {
-               return serializer.getLength();
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Serialization
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public T createInstance() {
-               return serializer.createInstance();
-       }
-
-       @Override
-       public void serialize(T value, DataOutputView target) throws 
IOException {
-               serializer.serialize(value, target);
-       }
-
-       @Override
-       public T deserialize(DataInputView source) throws IOException {
-               return serializer.deserialize(source);
-       }
-
-       @Override
-       public T deserialize(T reuse, DataInputView source) throws IOException {
-               return serializer.deserialize(reuse, source);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Copying
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public T copy(T from) {
-               return serializer.copy(from);
-       }
-
-       @Override
-       public T copy(T from, T reuse) {
-               return serializer.copy(from, reuse);
-       }
-
-       @Override
-       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-               serializer.copy(source, target);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public TypeSerializer<T> duplicate() {
-               return new BackwardsCompatibleAvroSerializer<>(type, 
serializer.duplicate());
-       }
-
-       @Override
-       public int hashCode() {
-               return type.hashCode();
-       }
-
-       @Override
-       public boolean equals(Object obj) {
-               if (obj == this) {
-                       return true;
-               }
-               else if (obj != null && obj.getClass() == 
BackwardsCompatibleAvroSerializer.class) {
-                       final BackwardsCompatibleAvroSerializer that = 
(BackwardsCompatibleAvroSerializer) obj;
-                       return this.type == that.type && 
this.serializer.equals(that.serializer);
-               }
-               else {
-                       return false;
-               }
-       }
-
-       @Override
-       public boolean canEqual(Object obj) {
-               return obj.getClass() == this.getClass();
-       }
-
-       @Override
-       public String toString() {
-               return getClass().getName() + " (" + type.getName() + ')';
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Configuration Snapshots and Upgrades
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public TypeSerializerSnapshot<T> snapshotConfiguration() {
-               // we return the configuration of the actually used serializer 
here
-               return serializer.snapshotConfiguration();
-       }
-
-       @Override
-       public CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
-               if (configSnapshot instanceof 
AvroSchemaSerializerConfigSnapshot ||
-                               configSnapshot instanceof 
AvroSerializerConfigSnapshot) {
-
-                       // avro serializer, nice :-)
-                       checkState(serializer instanceof AvroSerializer,
-                                       "Serializer was changed backwards to 
PojoSerializer and now encounters AvroSerializer snapshot.");
-
-                       return serializer.ensureCompatibility(configSnapshot);
-               }
-               else if (configSnapshot instanceof 
PojoSerializerConfigSnapshot) {
-                       // common previous case
-                       
checkState(SpecificRecordBase.class.isAssignableFrom(type),
-                                       "BackwardsCompatibleAvroSerializer 
resuming a state serialized " +
-                                                       "via a PojoSerializer, 
but not for an Avro Specific Record");
-
-                       final AvroTypeInfo<? extends SpecificRecordBase> 
typeInfo =
-                                       new 
AvroTypeInfo<>(type.asSubclass(SpecificRecordBase.class), true);
-
-                       @SuppressWarnings("unchecked")
-                       final TypeSerializer<T> pojoSerializer =
-                                       (TypeSerializer<T>) 
typeInfo.createPojoSerializer(new ExecutionConfig());
-                       this.serializer = pojoSerializer;
-                       return serializer.ensureCompatibility(configSnapshot);
-               }
-               else if (configSnapshot instanceof 
KryoRegistrationSerializerConfigSnapshot) {
-                       // force-kryo old case common previous case
-                       // we create a new Kryo Serializer with a blank 
execution config.
-                       // registrations are anyways picked up from the 
snapshot.
-                       serializer = new KryoSerializer<>(type, new 
ExecutionConfig());
-                       return serializer.ensureCompatibility(configSnapshot);
-               }
-               else {
-                       // completely incompatible type, needs migration
-                       return CompatibilityResult.requiresMigration();
-               }
-       }
-}
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java
new file mode 100644
index 00000000000..fb7114489d6
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Parser;
+import org.apache.avro.reflect.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+/**
+ * A wrapper for Avro {@link Schema}, that is Java serializable.
+ */
+@Internal
+final class SerializableAvroSchema implements Serializable {
+
+       private static final long serialVersionUID = 1;
+
+       private transient @Nullable Schema schema;
+
+       SerializableAvroSchema() {
+       }
+
+       SerializableAvroSchema(Schema schema) {
+               this.schema = schema;
+       }
+
+       Schema getAvroSchema() {
+               return schema;
+       }
+
+       private void writeObject(ObjectOutputStream oos) throws IOException {
+               if (schema == null) {
+                       oos.writeBoolean(false);
+               }
+               else {
+                       oos.writeBoolean(true);
+                       oos.writeUTF(schema.toString(false));
+               }
+       }
+
+       private void readObject(ObjectInputStream ois) throws 
ClassNotFoundException, IOException {
+               if (ois.readBoolean()) {
+                       String schema = ois.readUTF();
+                       this.schema = new Parser().parse(schema);
+               }
+               else {
+                       this.schema = null;
+               }
+       }
+}
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshotTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshotTest.java
new file mode 100644
index 00000000000..a6d98dccdc4
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshotTest.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.formats.avro.utils.TestDataGenerator;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericRecord;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.function.Function;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Test {@link AvroSerializerSnapshot}.
+ */
+public class AvroSerializerSnapshotTest {
+
+       private static final Schema FIRST_NAME = SchemaBuilder.record("name")
+               .namespace("org.apache.flink")
+               .fields()
+               .requiredString("first")
+               .endRecord();
+
+       private static final Schema FIRST_REQUIRED_LAST_OPTIONAL = 
SchemaBuilder.record("name")
+               .namespace("org.apache.flink")
+               .fields()
+               .requiredString("first")
+               .optionalString("last")
+               .endRecord();
+
+       private static final Schema BOTH_REQUIRED = SchemaBuilder.record("name")
+               .namespace("org.apache.flink")
+               .fields()
+               .requiredString("first")
+               .requiredString("last")
+               .endRecord();
+
+       @Test
+       public void sameSchemaShouldBeCompatibleAsIs() {
+               
assertThat(AvroSerializerSnapshot.resolveSchemaCompatibility(FIRST_NAME, 
FIRST_NAME), isCompatibleAsIs());
+       }
+
+       @Test
+       public void removingAnOptionalFieldsIsCompatibleAsIs() {
+               
assertThat(AvroSerializerSnapshot.resolveSchemaCompatibility(FIRST_REQUIRED_LAST_OPTIONAL,
 FIRST_NAME),
+                       isCompatibleAsIs());
+       }
+
+       @Test
+       public void addingAnOptionalFieldsIsCompatibleAsIs() {
+               
assertThat(AvroSerializerSnapshot.resolveSchemaCompatibility(FIRST_NAME, 
FIRST_REQUIRED_LAST_OPTIONAL),
+                       isCompatibleAsIs());
+       }
+
+       @Test
+       public void addingARequiredMakesSerializersIncompatible() {
+               
assertThat(AvroSerializerSnapshot.resolveSchemaCompatibility(FIRST_REQUIRED_LAST_OPTIONAL,
 BOTH_REQUIRED),
+                       isIncompatible());
+       }
+
+       @Test
+       public void anAvroSnapshotIsCompatibleWithItsOriginatingSerializer() {
+               AvroSerializer<GenericRecord> serializer =
+                       new AvroSerializer<>(GenericRecord.class, 
FIRST_REQUIRED_LAST_OPTIONAL);
+
+               TypeSerializerSnapshot<GenericRecord> snapshot = 
serializer.snapshotConfiguration();
+
+               assertThat(snapshot.resolveSchemaCompatibility(serializer), 
isCompatibleAsIs());
+       }
+
+       @Test
+       public void anAvroSnapshotIsCompatibleAfterARoundTrip() throws 
IOException {
+               AvroSerializer<GenericRecord> serializer =
+                       new AvroSerializer<>(GenericRecord.class, 
FIRST_REQUIRED_LAST_OPTIONAL);
+
+               AvroSerializerSnapshot<GenericRecord> restored = 
roundTrip(serializer.snapshotConfiguration());
+
+               assertThat(restored.resolveSchemaCompatibility(serializer), 
isCompatibleAsIs());
+       }
+
+       @Test
+       public void anAvroSpecificRecordIsCompatibleAfterARoundTrip() throws 
IOException {
+               // user is an avro generated test object.
+               AvroSerializer<User> serializer = new 
AvroSerializer<>(User.class);
+
+               AvroSerializerSnapshot<User> restored = 
roundTrip(serializer.snapshotConfiguration());
+
+               assertThat(restored.resolveSchemaCompatibility(serializer), 
isCompatibleAsIs());
+       }
+
+       @Test
+       public void aPojoIsCompatibleAfterARoundTrip() throws IOException {
+               AvroSerializer<Pojo> serializer = new 
AvroSerializer<>(Pojo.class);
+
+               AvroSerializerSnapshot<Pojo> restored = 
roundTrip(serializer.snapshotConfiguration());
+
+               assertThat(restored.resolveSchemaCompatibility(serializer), 
isCompatibleAsIs());
+       }
+
+       @Test
+       public void 
recordSerializedShouldBeDeserializeWithTheResortedSerializer() throws 
IOException {
+               // user is an avro generated test object.
+               final User user = TestDataGenerator.generateRandomUser(new 
Random());
+               final AvroSerializer<User> originalSerializer = new 
AvroSerializer<>(User.class);
+               //
+               // first serialize the record
+               //
+               ByteBuffer serializedUser = serialize(originalSerializer, user);
+               //
+               // then restore a serializer from the snapshot
+               //
+               TypeSerializer<User> restoredSerializer = 
originalSerializer.snapshotConfiguration().restoreSerializer();
+               //
+               // now deserialize the user with the resorted serializer.
+               //
+               User restoredUser = deserialize(restoredSerializer, 
serializedUser);
+
+               assertThat(user, is(restoredUser));
+       }
+
+       @Test
+       public void validSchemaEvaluationShouldResultInCompatibleSerializers() {
+               final AvroSerializer<GenericRecord> originalSerializer = new 
AvroSerializer<>(GenericRecord.class, FIRST_NAME);
+               final AvroSerializer<GenericRecord> newSerializer = new 
AvroSerializer<>(GenericRecord.class, FIRST_REQUIRED_LAST_OPTIONAL);
+
+               TypeSerializerSnapshot<GenericRecord> originalSnapshot = 
originalSerializer.snapshotConfiguration();
+
+               
assertThat(originalSnapshot.resolveSchemaCompatibility(newSerializer), 
isCompatibleAsIs());
+       }
+
+       @Test
+       public void 
nonValidSchemaEvaluationShouldResultInCompatibleSerializers() {
+               final AvroSerializer<GenericRecord> originalSerializer = new 
AvroSerializer<>(GenericRecord.class, FIRST_REQUIRED_LAST_OPTIONAL);
+               final AvroSerializer<GenericRecord> newSerializer = new 
AvroSerializer<>(GenericRecord.class, BOTH_REQUIRED);
+
+               TypeSerializerSnapshot<GenericRecord> originalSnapshot = 
originalSerializer.snapshotConfiguration();
+
+               
assertThat(originalSnapshot.resolveSchemaCompatibility(newSerializer), 
isIncompatible());
+       }
+
+       @Test
+       @SuppressWarnings({"unchecked", "rawtypes"})
+       public void 
changingFromGenericToSpecificWithCompatibleSchemaShouldResultInCompatibleSerializers()
 {
+               // starting with a generic serializer
+               AvroSerializer<Object> generic = new 
AvroSerializer(GenericRecord.class, User.SCHEMA$);
+               TypeSerializerSnapshot<Object> genericSnapshot = 
generic.snapshotConfiguration();
+
+               // then upgrading to a specific serializer
+               AvroSerializer<Object> specificSerializer = new 
AvroSerializer(User.class);
+               specificSerializer.snapshotConfiguration();
+
+               
assertThat(genericSnapshot.resolveSchemaCompatibility(specificSerializer), 
isCompatibleAsIs());
+       }
+
+       // 
---------------------------------------------------------------------------------------------------------------
+       // Matchers
+       // 
---------------------------------------------------------------------------------------------------------------
+
+       private Matcher<TypeSerializerSchemaCompatibility> isCompatibleAsIs() {
+               return 
matcher(TypeSerializerSchemaCompatibility::isCompatibleAsIs, "compatible as 
is");
+       }
+
+       private Matcher<TypeSerializerSchemaCompatibility> 
isCompatibleAfterMigration() {
+               return 
matcher(TypeSerializerSchemaCompatibility::isCompatibleAfterMigration,
+                       "compatible after migration");
+       }
+
+       private Matcher<TypeSerializerSchemaCompatibility> isIncompatible() {
+               return 
matcher(TypeSerializerSchemaCompatibility::isIncompatible,
+                       "incompatible");
+       }
+
+       private static <T> Matcher<T> matcher(Function<T, Boolean> predicate, 
String message) {
+               return new TypeSafeDiagnosingMatcher<T>() {
+
+                       @Override
+                       protected boolean matchesSafely(T item, Description 
mismatchDescription) {
+                               if (predicate.apply(item)) {
+                                       return true;
+                               }
+                               mismatchDescription.appendText("not 
").appendText(message);
+                               return false;
+                       }
+
+                       @Override
+                       public void describeTo(Description description) {
+                       }
+               };
+       }
+
+       // 
---------------------------------------------------------------------------------------------------------------
+       // Utils
+       // 
---------------------------------------------------------------------------------------------------------------
+
+       /**
+        * Serialize an (avro)TypeSerializerSnapshot and deserialize it.
+        */
+       private static <T> AvroSerializerSnapshot<T> 
roundTrip(TypeSerializerSnapshot<T> original) throws IOException {
+               // write
+               DataOutputSerializer out = new DataOutputSerializer(1024);
+               original.write(out);
+
+               // init
+               AvroSerializerSnapshot<T> restored = new 
AvroSerializerSnapshot<>();
+
+               // read
+               DataInputView in = new 
DataInputDeserializer(out.wrapAsByteBuffer());
+               restored.read(restored.getCurrentVersion(), in, 
original.getClass().getClassLoader());
+
+               return restored;
+       }
+
+       private static <T> ByteBuffer serialize(TypeSerializer<T> serializer, T 
record) throws IOException {
+               DataOutputSerializer out = new DataOutputSerializer(1024);
+               serializer.serialize(record, out);
+               return out.wrapAsByteBuffer();
+       }
+
+       private static <T> T deserialize(TypeSerializer<T> serializer, 
ByteBuffer serializedRecord) throws IOException {
+               DataInputView in = new DataInputDeserializer(serializedRecord);
+               return serializer.deserialize(in);
+       }
+
+       // 
---------------------------------------------------------------------------------------------------------------
+       // Test classes
+       // 
---------------------------------------------------------------------------------------------------------------
+
+       private static class Pojo {
+               private String foo;
+
+               public String getFoo() {
+                       return foo;
+               }
+
+               public void setFoo(String foo) {
+                       this.foo = foo;
+               }
+       }
+}
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
deleted file mode 100644
index 7b8763bfa2f..00000000000
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.formats.avro.typeutils;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.formats.avro.generated.SimpleUser;
-import org.apache.flink.formats.avro.utils.TestDataGenerator;
-
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-import java.util.Random;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * This test ensures that state and state configuration created by Flink 1.3 
Avro types
- * that used the PojoSerializer still works (in most cases, see notice below).
- *
- * <p><b>Important:</b> Since Avro itself broke class compatibility between 
1.7.7 (used in Flink 1.3)
- * and 1.8.2 (used in Flink 1.4), the Avro by Pojo compatibility is broken 
through Avro already.
- * This test only tests that the Avro serializer change (switching from Pojo 
to Avro for Avro types)
- * works properly.
- *
- * <p>This test can be dropped once we drop backwards compatibility with Flink 
1.3 snapshots.
- *
- * <p>The {@link BackwardsCompatibleAvroSerializer} does not support custom 
Kryo registrations (which
- * logical types require for Avro 1.8 because Kryo does not support 
Joda-Time). We introduced a
- * simpler user record for pre-Avro 1.8 test cases.
- */
-public class BackwardsCompatibleAvroSerializerTest {
-
-       private static final String SNAPSHOT_RESOURCE = 
"flink-1.6-avro-type-serializer-snapshot";
-
-       private static final String DATA_RESOURCE = 
"flink-1.6-avro-type-serialized-data";
-
-       @SuppressWarnings("unused")
-       private static final String SNAPSHOT_RESOURCE_WRITER = 
"/data/repositories/flink/flink-formats/flink-avro/src/test/resources/" + 
SNAPSHOT_RESOURCE;
-
-       @SuppressWarnings("unused")
-       private static final String DATA_RESOURCE_WRITER = 
"/data/repositories/flink/flink-formats/flink-avro/src/test/resources/" + 
DATA_RESOURCE;
-
-       private static final long RANDOM_SEED = 143065108437678L;
-
-       private static final int NUM_DATA_ENTRIES = 20;
-
-       @Test
-       public void testCompatibilityWithPojoSerializer() throws Exception {
-
-               // retrieve the old config snapshot
-
-               final TypeSerializer<SimpleUser> serializer;
-               final TypeSerializerSnapshot<SimpleUser> configSnapshot;
-
-               try (InputStream in = 
getClass().getClassLoader().getResourceAsStream(SNAPSHOT_RESOURCE)) {
-                       DataInputViewStreamWrapper inView = new 
DataInputViewStreamWrapper(in);
-
-                       List<Tuple2<TypeSerializer<?>, 
TypeSerializerSnapshot<?>>> deserialized =
-                                       
TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(
-                                                       inView, 
getClass().getClassLoader());
-
-                       assertEquals(1, deserialized.size());
-
-                       @SuppressWarnings("unchecked")
-                       final TypeSerializer<SimpleUser> typedSerializer = 
(TypeSerializer<SimpleUser>) deserialized.get(0).f0;
-
-                       serializer = typedSerializer;
-                       configSnapshot = (TypeSerializerSnapshot<SimpleUser>) 
deserialized.get(0).f1;
-               }
-
-               assertNotNull(serializer);
-               assertNotNull(configSnapshot);
-
-               assertTrue(serializer instanceof PojoSerializer);
-               assertTrue(configSnapshot instanceof 
PojoSerializer.PojoSerializerConfigSnapshot);
-
-               // sanity check for the test: check that the test data works 
with the original serializer
-               validateDeserialization(serializer);
-
-               // sanity check for the test: check that a PoJoSerializer and 
the original serializer work together
-               
assertFalse(serializer.ensureCompatibility(configSnapshot).isRequiresMigration());
-
-               final TypeSerializer<SimpleUser> newSerializer = new 
AvroTypeInfo<>(SimpleUser.class, true).createSerializer(new ExecutionConfig());
-               
assertFalse(newSerializer.ensureCompatibility(configSnapshot).isRequiresMigration());
-
-               // deserialize the data and make sure this still works
-               validateDeserialization(newSerializer);
-
-               TypeSerializerSnapshot<SimpleUser> nextSnapshot = 
newSerializer.snapshotConfiguration();
-               final TypeSerializer<SimpleUser> nextSerializer = new 
AvroTypeInfo<>(SimpleUser.class, true).createSerializer(new ExecutionConfig());
-
-               
assertFalse(nextSerializer.ensureCompatibility(nextSnapshot).isRequiresMigration());
-
-               // deserialize the data and make sure this still works
-               validateDeserialization(nextSerializer);
-       }
-
-       private static void validateDeserialization(TypeSerializer<SimpleUser> 
serializer) throws IOException {
-               final Random rnd = new Random(RANDOM_SEED);
-
-               try (InputStream in = 
BackwardsCompatibleAvroSerializerTest.class.getClassLoader()
-                               .getResourceAsStream(DATA_RESOURCE)) {
-
-                       final DataInputViewStreamWrapper inView = new 
DataInputViewStreamWrapper(in);
-
-                       for (int i = 0; i < NUM_DATA_ENTRIES; i++) {
-                               final SimpleUser deserialized = 
serializer.deserialize(inView);
-
-                               // deterministically generate a reference record
-                               final SimpleUser reference = 
TestDataGenerator.generateRandomSimpleUser(rnd);
-
-                               assertEquals(reference, deserialized);
-                       }
-               }
-       }
-
-// run this code to generate the test data
-//     public static void main(String[] args) throws Exception {
-//
-//             AvroTypeInfo<SimpleUser> typeInfo = new 
AvroTypeInfo<>(SimpleUser.class);
-//
-//             TypeSerializer<SimpleUser> serializer = 
typeInfo.createPojoSerializer(new ExecutionConfig());
-//             TypeSerializerConfigSnapshot confSnapshot = 
serializer.snapshotConfiguration();
-//
-//             try (FileOutputStream fos = new 
FileOutputStream(SNAPSHOT_RESOURCE_WRITER)) {
-//                     DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(fos);
-//
-//                     
TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
-//                                     out,
-//                                     Collections.singletonList(
-//                                                     new 
Tuple2<>(serializer, confSnapshot)));
-//             }
-//
-//             try (FileOutputStream fos = new 
FileOutputStream(DATA_RESOURCE_WRITER)) {
-//                     final DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(fos);
-//                     final Random rnd = new Random(RANDOM_SEED);
-//
-//                     for (int i = 0; i < NUM_DATA_ENTRIES; i++) {
-//                             
serializer.serialize(TestDataGenerator.generateRandomSimpleUser(rnd), out);
-//                     }
-//             }
-//     }
-}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Upgrade AvroSerializer snapshot to implement new TypeSerializerSnapshot 
> interface
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-10605
>                 URL: https://issues.apache.org/jira/browse/FLINK-10605
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Type Serialization System
>            Reporter: Igal Shilman
>            Assignee: Igal Shilman
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>
> This issue introduces a new AvroSerializerSnapshot implementation that 
> conforms to the new TypeSerializerSnapshot API.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to