Repository: flink
Updated Branches:
  refs/heads/master 88737cf9f -> c85f15ead


[FLINK-6022] [avro] Use Avro to serialize Avro in flight and in State

This falls back to the original serializer (Pojo / Kryo) in cases where
an old snapshot is resumed.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f3a2197a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f3a2197a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f3a2197a

Branch: refs/heads/master
Commit: f3a2197a23524048200ae2b4712d6ed833208124
Parents: 88737cf
Author: Stephan Ewen <[email protected]>
Authored: Fri Nov 3 14:47:33 2017 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Mon Nov 6 18:56:47 2017 +0100

----------------------------------------------------------------------
 .../flink/api/java/typeutils/PojoTypeInfo.java  |   6 +-
 .../flink/api/java/typeutils/AvroTypeInfo.java  |   2 +-
 .../java/typeutils/runtime/AvroSerializer.java  | 337 ++++++++++++++++
 .../formats/avro/typeutils/AvroSerializer.java  | 382 +++++++++++--------
 .../formats/avro/typeutils/AvroTypeInfo.java    |  91 +++--
 .../BackwardsCompatibleAvroSerializer.java      | 218 +++++++++++
 .../avro/typeutils/AvroSerializerTest.java      |  59 +++
 .../avro/typeutils/AvroTypeExtractionTest.java  |  14 +-
 .../avro/typeutils/AvroTypeInfoTest.java        |  12 +
 .../BackwardsCompatibleAvroSerializerTest.java  | 167 ++++++++
 .../formats/avro/utils/TestDataGenerator.java   | 120 ++++++
 .../flink-1.3-avro-type-serialized-data         | Bin 0 -> 23926 bytes
 .../flink-1.3-avro-type-serializer-snapshot     | Bin 0 -> 48089 bytes
 13 files changed, 1208 insertions(+), 200 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
index 2e893bb..211b7ef 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -309,6 +309,10 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
                        return 
AvroUtils.getAvroUtils().createAvroSerializer(getTypeClass());
                }
 
+               return createPojoSerializer(config);
+       }
+
+       public PojoSerializer<T> createPojoSerializer(ExecutionConfig config) {
                TypeSerializer<?>[] fieldSerializers = new 
TypeSerializer<?>[fields.length];
                Field[] reflectiveFields = new Field[fields.length];
 
@@ -319,7 +323,7 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
 
                return new PojoSerializer<T>(getTypeClass(), fieldSerializers, 
reflectiveFields, config);
        }
-       
+
        @Override
        public boolean equals(Object obj) {
                if (obj instanceof PojoTypeInfo) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
index 58085f6..03bacfa 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
@@ -33,6 +33,6 @@ import static 
org.apache.flink.formats.avro.typeutils.AvroTypeInfo.generateField
 public class AvroTypeInfo<T extends SpecificRecordBase> extends 
PojoTypeInfo<T> {
 
        public AvroTypeInfo(Class<T> typeClass) {
-               super(typeClass, generateFieldsFromAvroSchema(typeClass));
+               super(typeClass, generateFieldsFromAvroSchema(typeClass, true));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
new file mode 100644
index 0000000..228e672
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.formats.avro.utils.DataInputDecoder;
+import org.apache.flink.formats.avro.utils.DataOutputEncoder;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import com.esotericsoftware.kryo.Kryo;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.util.Utf8;
+
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Old deprecated Avro serializer. It is retained for a smoother experience 
when
+ * upgrading from an earlier Flink savepoint that stored this serializer.
+ */
+@Internal
+@Deprecated
+@SuppressWarnings({"unused", "deprecation"})
+public final class AvroSerializer<T> extends TypeSerializer<T> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final Class<T> type;
+
+       private final Class<? extends T> typeToInstantiate;
+
+       /**
+        * Map of class tag (using classname as tag) to their Kryo registration.
+        *
+        * <p>This map serves as a preview of the final registration result of
+        * the Kryo instance, taking into account registration overwrites.
+        */
+       private LinkedHashMap<String, KryoRegistration> kryoRegistrations;
+
+       private transient ReflectDatumWriter<T> writer;
+       private transient ReflectDatumReader<T> reader;
+
+       private transient DataOutputEncoder encoder;
+       private transient DataInputDecoder decoder;
+
+       private transient Kryo kryo;
+
+       private transient T deepCopyInstance;
+
+       // 
--------------------------------------------------------------------------------------------
+
+       public AvroSerializer(Class<T> type) {
+               this(type, type);
+       }
+
+       public AvroSerializer(Class<T> type, Class<? extends T> 
typeToInstantiate) {
+               this.type = checkNotNull(type);
+               this.typeToInstantiate = checkNotNull(typeToInstantiate);
+
+               InstantiationUtil.checkForInstantiation(typeToInstantiate);
+
+               this.kryoRegistrations = buildKryoRegistrations(type);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public boolean isImmutableType() {
+               return false;
+       }
+
+       @Override
+       public AvroSerializer<T> duplicate() {
+               return new AvroSerializer<>(type, typeToInstantiate);
+       }
+
+       @Override
+       public T createInstance() {
+               return InstantiationUtil.instantiate(this.typeToInstantiate);
+       }
+
+       @Override
+       public T copy(T from) {
+               checkKryoInitialized();
+
+               return KryoUtils.copy(from, kryo, this);
+       }
+
+       @Override
+       public T copy(T from, T reuse) {
+               checkKryoInitialized();
+
+               return KryoUtils.copy(from, reuse, kryo, this);
+       }
+
+       @Override
+       public int getLength() {
+               return -1;
+       }
+
+       @Override
+       public void serialize(T value, DataOutputView target) throws 
IOException {
+               checkAvroInitialized();
+               this.encoder.setOut(target);
+               this.writer.write(value, this.encoder);
+       }
+
+       @Override
+       public T deserialize(DataInputView source) throws IOException {
+               checkAvroInitialized();
+               this.decoder.setIn(source);
+               return this.reader.read(null, this.decoder);
+       }
+
+       @Override
+       public T deserialize(T reuse, DataInputView source) throws IOException {
+               checkAvroInitialized();
+               this.decoder.setIn(source);
+               return this.reader.read(reuse, this.decoder);
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               checkAvroInitialized();
+
+               if (this.deepCopyInstance == null) {
+                       this.deepCopyInstance = 
InstantiationUtil.instantiate(type, Object.class);
+               }
+
+               this.decoder.setIn(source);
+               this.encoder.setOut(target);
+
+               T tmp = this.reader.read(this.deepCopyInstance, this.decoder);
+               this.writer.write(tmp, this.encoder);
+       }
+
+       private void checkAvroInitialized() {
+               if (this.reader == null) {
+                       this.reader = new ReflectDatumReader<>(type);
+                       this.writer = new ReflectDatumWriter<>(type);
+                       this.encoder = new DataOutputEncoder();
+                       this.decoder = new DataInputDecoder();
+               }
+       }
+
+       private void checkKryoInitialized() {
+               if (this.kryo == null) {
+                       this.kryo = new Kryo();
+
+                       Kryo.DefaultInstantiatorStrategy instantiatorStrategy = 
new Kryo.DefaultInstantiatorStrategy();
+                       
instantiatorStrategy.setFallbackInstantiatorStrategy(new 
StdInstantiatorStrategy());
+                       kryo.setInstantiatorStrategy(instantiatorStrategy);
+
+                       kryo.setAsmEnabled(true);
+
+                       KryoUtils.applyRegistrations(kryo, 
kryoRegistrations.values());
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public int hashCode() {
+               return 31 * this.type.hashCode() + 
this.typeToInstantiate.hashCode();
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof AvroSerializer) {
+                       @SuppressWarnings("unchecked")
+                       AvroSerializer<T> avroSerializer = (AvroSerializer<T>) 
obj;
+
+                       return avroSerializer.canEqual(this) &&
+                                       type == avroSerializer.type &&
+                                       typeToInstantiate == 
avroSerializer.typeToInstantiate;
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof AvroSerializer;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Serializer configuration snapshotting & compatibility
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public AvroSerializerConfigSnapshot<T> snapshotConfiguration() {
+               return new AvroSerializerConfigSnapshot<>(type, 
typeToInstantiate, kryoRegistrations);
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+               if (configSnapshot instanceof AvroSerializerConfigSnapshot) {
+                       final AvroSerializerConfigSnapshot<T> config = 
(AvroSerializerConfigSnapshot<T>) configSnapshot;
+
+                       if (type.equals(config.getTypeClass()) && 
typeToInstantiate.equals(config.getTypeToInstantiate())) {
+                               // resolve Kryo registrations; currently, since 
the Kryo registrations in Avro
+                               // are fixed, there shouldn't be a problem with 
the resolution here.
+
+                               LinkedHashMap<String, KryoRegistration> 
oldRegistrations = config.getKryoRegistrations();
+                               oldRegistrations.putAll(kryoRegistrations);
+
+                               for (Map.Entry<String, KryoRegistration> 
reconfiguredRegistrationEntry : kryoRegistrations.entrySet()) {
+                                       if 
(reconfiguredRegistrationEntry.getValue().isDummy()) {
+                                               return 
CompatibilityResult.requiresMigration();
+                                       }
+                               }
+
+                               this.kryoRegistrations = oldRegistrations;
+                               return CompatibilityResult.compatible();
+                       }
+               }
+
+               // ends up here if the preceding serializer is not
+               // the ValueSerializer, or serialized data type has changed
+               return CompatibilityResult.requiresMigration();
+       }
+
+       /**
+        * Config snapshot for this serializer.
+        */
+       public static class AvroSerializerConfigSnapshot<T> extends 
KryoRegistrationSerializerConfigSnapshot<T> {
+
+               private static final int VERSION = 1;
+
+               private Class<? extends T> typeToInstantiate;
+
+               public AvroSerializerConfigSnapshot() {}
+
+               public AvroSerializerConfigSnapshot(
+                               Class<T> baseType,
+                               Class<? extends T> typeToInstantiate,
+                               LinkedHashMap<String, KryoRegistration> 
kryoRegistrations) {
+
+                       super(baseType, kryoRegistrations);
+                       this.typeToInstantiate = 
Preconditions.checkNotNull(typeToInstantiate);
+               }
+
+               @Override
+               public void write(DataOutputView out) throws IOException {
+                       super.write(out);
+
+                       out.writeUTF(typeToInstantiate.getName());
+               }
+
+               @SuppressWarnings("unchecked")
+               @Override
+               public void read(DataInputView in) throws IOException {
+                       super.read(in);
+
+                       String classname = in.readUTF();
+                       try {
+                               typeToInstantiate = (Class<? extends T>) 
Class.forName(classname, true, getUserCodeClassLoader());
+                       } catch (ClassNotFoundException e) {
+                               throw new IOException("Cannot find requested 
class " + classname + " in classpath.", e);
+                       }
+               }
+
+               @Override
+               public int getVersion() {
+                       return VERSION;
+               }
+
+               public Class<? extends T> getTypeToInstantiate() {
+                       return typeToInstantiate;
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+               in.defaultReadObject();
+
+               // kryoRegistrations may be null if this Avro serializer is 
deserialized from an old version
+               if (kryoRegistrations == null) {
+                       this.kryoRegistrations = buildKryoRegistrations(type);
+               }
+       }
+
+       private static <T> LinkedHashMap<String, KryoRegistration> 
buildKryoRegistrations(Class<T> serializedDataType) {
+               final LinkedHashMap<String, KryoRegistration> registrations = 
new LinkedHashMap<>();
+
+               // register Avro types.
+               registrations.put(
+                               GenericData.Array.class.getName(),
+                               new KryoRegistration(
+                                               GenericData.Array.class,
+                                               new 
ExecutionConfig.SerializableSerializer<>(new 
Serializers.SpecificInstanceCollectionSerializerForArrayList())));
+               registrations.put(Utf8.class.getName(), new 
KryoRegistration(Utf8.class));
+               registrations.put(GenericData.EnumSymbol.class.getName(), new 
KryoRegistration(GenericData.EnumSymbol.class));
+               registrations.put(GenericData.Fixed.class.getName(), new 
KryoRegistration(GenericData.Fixed.class));
+               registrations.put(GenericData.StringType.class.getName(), new 
KryoRegistration(GenericData.StringType.class));
+
+               // register the serialized data type
+               registrations.put(serializedDataType.getName(), new 
KryoRegistration(serializedDataType));
+
+               return registrations;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
index 02f74f5..bc3369f 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
@@ -18,85 +18,93 @@
 
 package org.apache.flink.formats.avro.typeutils;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
 import 
org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
-import org.apache.flink.api.java.typeutils.runtime.KryoUtils;
-import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.formats.avro.utils.DataInputDecoder;
 import org.apache.flink.formats.avro.utils.DataOutputEncoder;
 import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
 
-import com.esotericsoftware.kryo.Kryo;
-import org.apache.avro.generic.GenericData;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaCompatibility;
+import org.apache.avro.SchemaCompatibility.SchemaCompatibilityType;
+import org.apache.avro.SchemaCompatibility.SchemaPairCompatibility;
+import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.avro.util.Utf8;
-import org.objenesis.strategy.StdInstantiatorStrategy;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
 
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.util.LinkedHashMap;
-import java.util.Map;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * General purpose serialization. Currently using Apache Avro's 
Reflect-serializers for serialization and
- * Kryo for deep object copies. We want to change this to Kryo-only.
+ * A serializer that serializes types via Avro.
  *
- * @param <T> The type serialized.
+ * <p>The serializer supports both efficient specific record serialization for
+ * types generated via Avro, as well as serialization via reflection
+ * (ReflectDatumReader / -Writer). The serializer instantiates them depending 
on
+ * the class of the type it should serialize.
+ *
+ * @param <T> The type to be serialized.
  */
-@Internal
-public final class AvroSerializer<T> extends TypeSerializer<T> {
+public class AvroSerializer<T> extends TypeSerializer<T> {
 
        private static final long serialVersionUID = 1L;
 
-       private final Class<T> type;
+       // -------- configuration fields, serializable -----------
 
-       private final Class<? extends T> typeToInstantiate;
+       /** The class of the type that is serialized by this serializer. */
+       private final Class<T> type;
 
-       /**
-        * Map of class tag (using classname as tag) to their Kryo registration.
-        *
-        * <p>This map serves as a preview of the final registration result of
-        * the Kryo instance, taking into account registration overwrites.
-        */
-       private LinkedHashMap<String, KryoRegistration> kryoRegistrations;
+       // -------- runtime fields, non-serializable, lazily initialized 
-----------
 
-       private transient ReflectDatumWriter<T> writer;
-       private transient ReflectDatumReader<T> reader;
+       private transient SpecificDatumWriter<T> writer;
+       private transient SpecificDatumReader<T> reader;
 
        private transient DataOutputEncoder encoder;
        private transient DataInputDecoder decoder;
 
-       private transient Kryo kryo;
+       private transient SpecificData avroData;
 
-       private transient T deepCopyInstance;
+       private transient Schema schema;
 
-       // 
--------------------------------------------------------------------------------------------
+       /** The serializer configuration snapshot, cached for efficiency. */
+       private transient AvroSchemaSerializerConfigSnapshot configSnapshot;
 
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a new AvroSerializer for the type indicated by the given 
class.
+        */
        public AvroSerializer(Class<T> type) {
-               this(type, type);
+               this.type = checkNotNull(type);
        }
 
+       /**
+        * @deprecated Use {@link AvroSerializer#AvroSerializer(Class)} instead.
+        */
+       @Deprecated
+       @SuppressWarnings("unused")
        public AvroSerializer(Class<T> type, Class<? extends T> 
typeToInstantiate) {
-               this.type = checkNotNull(type);
-               this.typeToInstantiate = checkNotNull(typeToInstantiate);
+               this(type);
+       }
 
-               InstantiationUtil.checkForInstantiation(typeToInstantiate);
+       // 
------------------------------------------------------------------------
 
-               this.kryoRegistrations = buildKryoRegistrations(type);
+       public Class<T> getType() {
+               return type;
        }
 
-       // 
--------------------------------------------------------------------------------------------
+       // 
------------------------------------------------------------------------
+       //  Properties
+       // 
------------------------------------------------------------------------
 
        @Override
        public boolean isImmutableType() {
@@ -104,32 +112,17 @@ public final class AvroSerializer<T> extends 
TypeSerializer<T> {
        }
 
        @Override
-       public AvroSerializer<T> duplicate() {
-               return new AvroSerializer<T>(type, typeToInstantiate);
-       }
-
-       @Override
-       public T createInstance() {
-               return InstantiationUtil.instantiate(this.typeToInstantiate);
-       }
-
-       @Override
-       public T copy(T from) {
-               checkKryoInitialized();
-
-               return KryoUtils.copy(from, kryo, this);
+       public int getLength() {
+               return -1;
        }
 
-       @Override
-       public T copy(T from, T reuse) {
-               checkKryoInitialized();
-
-               return KryoUtils.copy(from, reuse, kryo, this);
-       }
+       // 
------------------------------------------------------------------------
+       //  Serialization
+       // 
------------------------------------------------------------------------
 
        @Override
-       public int getLength() {
-               return -1;
+       public T createInstance() {
+               return InstantiationUtil.instantiate(type);
        }
 
        @Override
@@ -153,111 +146,216 @@ public final class AvroSerializer<T> extends 
TypeSerializer<T> {
                return this.reader.read(reuse, this.decoder);
        }
 
+       // 
------------------------------------------------------------------------
+       //  Copying
+       // 
------------------------------------------------------------------------
+
        @Override
-       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+       public T copy(T from) {
                checkAvroInitialized();
+               return avroData.deepCopy(schema, from);
+       }
 
-               if (this.deepCopyInstance == null) {
-                       this.deepCopyInstance = 
InstantiationUtil.instantiate(type, Object.class);
-               }
-
-               this.decoder.setIn(source);
-               this.encoder.setOut(target);
+       @Override
+       public T copy(T from, T reuse) {
+               return copy(from);
+       }
 
-               T tmp = this.reader.read(this.deepCopyInstance, this.decoder);
-               this.writer.write(tmp, this.encoder);
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               T value = deserialize(source);
+               serialize(value, target);
        }
 
-       private void checkAvroInitialized() {
-               if (this.reader == null) {
-                       this.reader = new ReflectDatumReader<T>(type);
-                       this.writer = new ReflectDatumWriter<T>(type);
-                       this.encoder = new DataOutputEncoder();
-                       this.decoder = new DataInputDecoder();
+       // 
------------------------------------------------------------------------
+       //  Compatibility and Upgrades
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public TypeSerializerConfigSnapshot snapshotConfiguration() {
+               if (configSnapshot == null) {
+                       checkAvroInitialized();
+                       configSnapshot = new 
AvroSchemaSerializerConfigSnapshot(schema.toString(false));
                }
+               return configSnapshot;
        }
 
-       private void checkKryoInitialized() {
-               if (this.kryo == null) {
-                       this.kryo = new Kryo();
-
-                       Kryo.DefaultInstantiatorStrategy instantiatorStrategy = 
new Kryo.DefaultInstantiatorStrategy();
-                       
instantiatorStrategy.setFallbackInstantiatorStrategy(new 
StdInstantiatorStrategy());
-                       kryo.setInstantiatorStrategy(instantiatorStrategy);
+       @Override
+       @SuppressWarnings("deprecation")
+       public CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+               if (configSnapshot instanceof 
AvroSchemaSerializerConfigSnapshot) {
+                       // proper schema snapshot, can do the sophisticated 
schema-based compatibility check
+                       final String schemaString = 
((AvroSchemaSerializerConfigSnapshot) configSnapshot).getSchemaString();
+                       final Schema lastSchema = new 
Schema.Parser().parse(schemaString);
 
-                       kryo.setAsmEnabled(true);
+                       checkAvroInitialized();
+                       final SchemaPairCompatibility compatibility =
+                                       
SchemaCompatibility.checkReaderWriterCompatibility(schema, lastSchema);
 
-                       KryoUtils.applyRegistrations(kryo, 
kryoRegistrations.values());
+                       return compatibility.getType() == 
SchemaCompatibilityType.COMPATIBLE ?
+                                       CompatibilityResult.compatible() : 
CompatibilityResult.requiresMigration();
+               }
+               else if (configSnapshot instanceof 
AvroSerializerConfigSnapshot) {
+                       // old snapshot case, just compare the type
+                       // we don't need to restore any Kryo stuff, since Kryo 
was never used for persistence,
+                       // only for object-to-object copies.
+                       final AvroSerializerConfigSnapshot old = 
(AvroSerializerConfigSnapshot) configSnapshot;
+                       return type.equals(old.getTypeClass()) ?
+                                       CompatibilityResult.compatible() : 
CompatibilityResult.requiresMigration();
+               }
+               else {
+                       return CompatibilityResult.requiresMigration();
                }
        }
 
-       // 
--------------------------------------------------------------------------------------------
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public TypeSerializer<T> duplicate() {
+               return new AvroSerializer<>(type);
+       }
 
        @Override
        public int hashCode() {
-               return 31 * this.type.hashCode() + 
this.typeToInstantiate.hashCode();
+               return 42 + type.hashCode();
        }
 
        @Override
        public boolean equals(Object obj) {
-               if (obj instanceof AvroSerializer) {
-                       @SuppressWarnings("unchecked")
-                       AvroSerializer<T> avroSerializer = (AvroSerializer<T>) 
obj;
-
-                       return avroSerializer.canEqual(this) &&
-                               type == avroSerializer.type &&
-                               typeToInstantiate == 
avroSerializer.typeToInstantiate;
-               } else {
+               if (obj == this) {
+                       return true;
+               }
+               else if (obj != null && obj.getClass() == AvroSerializer.class) 
{
+                       final AvroSerializer that = (AvroSerializer) obj;
+                       return this.type == that.type;
+               }
+               else {
                        return false;
                }
        }
 
        @Override
        public boolean canEqual(Object obj) {
-               return obj instanceof AvroSerializer;
+               return obj.getClass() == this.getClass();
        }
 
-       // 
--------------------------------------------------------------------------------------------
-       // Serializer configuration snapshotting & compatibility
-       // 
--------------------------------------------------------------------------------------------
-
        @Override
-       public AvroSerializerConfigSnapshot<T> snapshotConfiguration() {
-               return new AvroSerializerConfigSnapshot<>(type, 
typeToInstantiate, kryoRegistrations);
+       public String toString() {
+               return getClass().getName() + " (" + getType().getName() + ')';
        }
 
-       @SuppressWarnings("unchecked")
-       @Override
-       public CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-               if (configSnapshot instanceof AvroSerializerConfigSnapshot) {
-                       final AvroSerializerConfigSnapshot<T> config = 
(AvroSerializerConfigSnapshot<T>) configSnapshot;
+       // 
------------------------------------------------------------------------
+       //  Initialization
+       // 
------------------------------------------------------------------------
+
+       private void checkAvroInitialized() {
+               if (writer == null) {
+                       initializeAvro();
+               }
+       }
+
+       private void initializeAvro() {
+               final ClassLoader cl = 
Thread.currentThread().getContextClassLoader();
+
+               if (SpecificRecord.class.isAssignableFrom(type)) {
+                       this.avroData = new SpecificData(cl);
+                       this.schema = this.avroData.getSchema(type);
+                       this.reader = new SpecificDatumReader<>(schema, schema, 
avroData);
+                       this.writer = new SpecificDatumWriter<>(schema, 
avroData);
+               }
+               else {
+                       final ReflectData reflectData = new ReflectData(cl);
+                       this.avroData = reflectData;
+                       this.schema = this.avroData.getSchema(type);
+                       this.reader = new ReflectDatumReader<>(schema, schema, 
reflectData);
+                       this.writer = new ReflectDatumWriter<>(schema, 
reflectData);
+               }
+
+               this.encoder = new DataOutputEncoder();
+               this.decoder = new DataInputDecoder();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Serializer Snapshots
+       // 
------------------------------------------------------------------------
+
+       /**
+        * A config snapshot for the Avro Serializer that stores the Avro 
Schema to check compatibility.
+        */
+       public static final class AvroSchemaSerializerConfigSnapshot extends 
TypeSerializerConfigSnapshot {
+
+               private String schemaString;
+
+               /**
+                * Default constructor for instantiation via reflection.
+                */
+               @SuppressWarnings("unused")
+               public AvroSchemaSerializerConfigSnapshot() {}
+
+               public AvroSchemaSerializerConfigSnapshot(String schemaString) {
+                       this.schemaString = checkNotNull(schemaString);
+               }
+
+               public String getSchemaString() {
+                       return schemaString;
+               }
+
+               // --- Serialization ---
+
+               @Override
+               public void read(DataInputView in) throws IOException {
+                       super.read(in);
+                       this.schemaString = in.readUTF();
+               }
+
+               @Override
+               public void write(DataOutputView out) throws IOException {
+                       super.write(out);
+                       out.writeUTF(schemaString);
+               }
 
-                       if (type.equals(config.getTypeClass()) && 
typeToInstantiate.equals(config.getTypeToInstantiate())) {
-                               // resolve Kryo registrations; currently, since 
the Kryo registrations in Avro
-                               // are fixed, there shouldn't be a problem with 
the resolution here.
+               // --- Version ---
 
-                               LinkedHashMap<String, KryoRegistration> 
oldRegistrations = config.getKryoRegistrations();
-                               oldRegistrations.putAll(kryoRegistrations);
+               @Override
+               public int getVersion() {
+                       return 1;
+               }
 
-                               for (Map.Entry<String, KryoRegistration> 
reconfiguredRegistrationEntry : kryoRegistrations.entrySet()) {
-                                       if 
(reconfiguredRegistrationEntry.getValue().isDummy()) {
-                                               return 
CompatibilityResult.requiresMigration();
-                                       }
-                               }
+               // --- Utils ---
 
-                               this.kryoRegistrations = oldRegistrations;
-                               return CompatibilityResult.compatible();
+               @Override
+               public boolean equals(Object obj) {
+                       if (obj == this) {
+                               return true;
+                       }
+                       else if (obj != null && obj.getClass() == 
AvroSchemaSerializerConfigSnapshot.class) {
+                               final AvroSchemaSerializerConfigSnapshot that = 
(AvroSchemaSerializerConfigSnapshot) obj;
+                               return 
this.schemaString.equals(that.schemaString);
+                       }
+                       else {
+                               return false;
                        }
                }
 
-               // ends up here if the preceding serializer is not
-               // the ValueSerializer, or serialized data type has changed
-               return CompatibilityResult.requiresMigration();
+               @Override
+               public int hashCode() {
+                       return 11 + schemaString.hashCode();
+               }
+
+               @Override
+               public String toString() {
+                       return getClass().getName() + " (" + schemaString + ')';
+               }
        }
 
        /**
-        * {@link TypeSerializerConfigSnapshot} for Avro.
+        * The outdated config snapshot, retained for backwards compatibility.
+        *
+        * @deprecated The {@link AvroSchemaSerializerConfigSnapshot} should be 
used instead.
         */
+       @Deprecated
        public static class AvroSerializerConfigSnapshot<T> extends 
KryoRegistrationSerializerConfigSnapshot<T> {
 
                private static final int VERSION = 1;
@@ -266,15 +364,6 @@ public final class AvroSerializer<T> extends 
TypeSerializer<T> {
 
                public AvroSerializerConfigSnapshot() {}
 
-               public AvroSerializerConfigSnapshot(
-                       Class<T> baseType,
-                       Class<? extends T> typeToInstantiate,
-                       LinkedHashMap<String, KryoRegistration> 
kryoRegistrations) {
-
-                       super(baseType, kryoRegistrations);
-                       this.typeToInstantiate = 
Preconditions.checkNotNull(typeToInstantiate);
-               }
-
                @Override
                public void write(DataOutputView out) throws IOException {
                        super.write(out);
@@ -304,35 +393,4 @@ public final class AvroSerializer<T> extends 
TypeSerializer<T> {
                        return typeToInstantiate;
                }
        }
-
-       // 
--------------------------------------------------------------------------------------------
-
-       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-               in.defaultReadObject();
-
-               // kryoRegistrations may be null if this Avro serializer is 
deserialized from an old version
-               if (kryoRegistrations == null) {
-                       this.kryoRegistrations = buildKryoRegistrations(type);
-               }
-       }
-
-       private static <T> LinkedHashMap<String, KryoRegistration> 
buildKryoRegistrations(Class<T> serializedDataType) {
-               final LinkedHashMap<String, KryoRegistration> registrations = 
new LinkedHashMap<>();
-
-               // register Avro types.
-               registrations.put(
-                               GenericData.Array.class.getName(),
-                               new KryoRegistration(
-                                               GenericData.Array.class,
-                                               new 
ExecutionConfig.SerializableSerializer<>(new 
Serializers.SpecificInstanceCollectionSerializerForArrayList())));
-               registrations.put(Utf8.class.getName(), new 
KryoRegistration(Utf8.class));
-               registrations.put(GenericData.EnumSymbol.class.getName(), new 
KryoRegistration(GenericData.EnumSymbol.class));
-               registrations.put(GenericData.Fixed.class.getName(), new 
KryoRegistration(GenericData.Fixed.class));
-               registrations.put(GenericData.StringType.class.getName(), new 
KryoRegistration(GenericData.StringType.class));
-
-               // register the serialized data type
-               registrations.put(serializedDataType.getName(), new 
KryoRegistration(serializedDataType));
-
-               return registrations;
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
index ad6b06e..644ee50 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
@@ -33,6 +33,7 @@ import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Special type information to generate a special AvroTypeInfo for Avro POJOs 
(implementing SpecificRecordBase, the typed Avro POJOs)
@@ -49,43 +50,83 @@ public class AvroTypeInfo<T extends SpecificRecordBase> 
extends PojoTypeInfo<T>
 
        private static final long serialVersionUID = 1L;
 
+       private static final ConcurrentHashMap<Thread, Boolean> 
IN_BACKWARDS_COMPATIBLE_MODE = new ConcurrentHashMap<>();
+
+       private final boolean useBackwardsCompatibleSerializer;
+
+       /**
+        * Creates a new Avro type info for the given class.
+        */
        public AvroTypeInfo(Class<T> typeClass) {
-               super(typeClass, generateFieldsFromAvroSchema(typeClass));
+               this(typeClass, false);
+       }
+
+       /**
+        * Creates a new Avro type info for the given class.
+        *
+        * <p>This constructor takes a flag to specify whether a serializer
+        * that is backwards compatible with PoJo-style serialization of Avro 
types should be used.
+        * That is only necessary, if one has a Flink 1.3 (or earlier) 
savepoint where Avro types
+        * were stored in the checkpointed state. New Flink programs will never 
need this.
+        */
+       public AvroTypeInfo(Class<T> typeClass, boolean 
useBackwardsCompatibleSerializer) {
+               super(typeClass, generateFieldsFromAvroSchema(typeClass, 
useBackwardsCompatibleSerializer));
+
+               final Boolean modeOnStack = 
IN_BACKWARDS_COMPATIBLE_MODE.get(Thread.currentThread());
+               this.useBackwardsCompatibleSerializer = modeOnStack == null ?
+                               useBackwardsCompatibleSerializer : modeOnStack;
        }
 
        @Override
+       @SuppressWarnings("deprecation")
        public TypeSerializer<T> createSerializer(ExecutionConfig config) {
-               return super.createSerializer(config);
+               return useBackwardsCompatibleSerializer ?
+                               new 
BackwardsCompatibleAvroSerializer<>(getTypeClass()) :
+                               new AvroSerializer<>(getTypeClass());
        }
 
        @SuppressWarnings("unchecked")
        @Internal
-       public static <T extends SpecificRecordBase> List<PojoField> 
generateFieldsFromAvroSchema(Class<T> typeClass) {
-               PojoTypeExtractor pte = new PojoTypeExtractor();
-               ArrayList<Type> typeHierarchy = new ArrayList<>();
-               typeHierarchy.add(typeClass);
-               TypeInformation ti = pte.analyzePojo(typeClass, typeHierarchy, 
null, null, null);
-
-               if (!(ti instanceof PojoTypeInfo)) {
-                       throw new IllegalStateException("Expecting type to be a 
PojoTypeInfo");
-               }
-               PojoTypeInfo pti =  (PojoTypeInfo) ti;
-               List<PojoField> newFields = new 
ArrayList<>(pti.getTotalFields());
-
-               for (int i = 0; i < pti.getArity(); i++) {
-                       PojoField f = pti.getPojoFieldAt(i);
-                       TypeInformation newType = f.getTypeInformation();
-                       // check if type is a CharSequence
-                       if (newType instanceof GenericTypeInfo) {
-                               if 
((newType).getTypeClass().equals(CharSequence.class)) {
-                                       // replace the type by a 
org.apache.avro.util.Utf8
-                                       newType = new 
GenericTypeInfo(org.apache.avro.util.Utf8.class);
+       public static <T extends SpecificRecordBase> List<PojoField> 
generateFieldsFromAvroSchema(
+                       Class<T> typeClass,
+                       boolean useBackwardsCompatibleSerializer) {
+
+               final Thread currentThread = Thread.currentThread();
+               final boolean entryPoint =
+                               
IN_BACKWARDS_COMPATIBLE_MODE.putIfAbsent(currentThread, 
useBackwardsCompatibleSerializer) == null;
+
+               try {
+                       PojoTypeExtractor pte = new PojoTypeExtractor();
+                       ArrayList<Type> typeHierarchy = new ArrayList<>();
+                       typeHierarchy.add(typeClass);
+                       TypeInformation ti = pte.analyzePojo(typeClass, 
typeHierarchy, null, null, null);
+
+                       if (!(ti instanceof PojoTypeInfo)) {
+                               throw new IllegalStateException("Expecting type 
to be a PojoTypeInfo");
+                       }
+                       PojoTypeInfo pti =  (PojoTypeInfo) ti;
+                       List<PojoField> newFields = new 
ArrayList<>(pti.getTotalFields());
+
+                       for (int i = 0; i < pti.getArity(); i++) {
+                               PojoField f = pti.getPojoFieldAt(i);
+                               TypeInformation newType = 
f.getTypeInformation();
+                               // check if type is a CharSequence
+                               if (newType instanceof GenericTypeInfo) {
+                                       if 
((newType).getTypeClass().equals(CharSequence.class)) {
+                                               // replace the type by a 
org.apache.avro.util.Utf8
+                                               newType = new 
GenericTypeInfo(org.apache.avro.util.Utf8.class);
+                                       }
                                }
+                               PojoField newField = new 
PojoField(f.getField(), newType);
+                               newFields.add(newField);
+                       }
+                       return newFields;
+               }
+               finally {
+                       if (entryPoint) {
+                               
IN_BACKWARDS_COMPATIBLE_MODE.remove(currentThread);
                        }
-                       PojoField newField = new PojoField(f.getField(), 
newType);
-                       newFields.add(newField);
                }
-               return newFields;
        }
 
        private static class PojoTypeExtractor extends TypeExtractor {

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java
new file mode 100644
index 0000000..e5eb5d8
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import 
org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
+import 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import 
org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSchemaSerializerConfigSnapshot;
+import 
org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSerializerConfigSnapshot;
+
+import org.apache.avro.specific.SpecificRecordBase;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An Avro serializer that can switch back to a KryoSerializer or a Pojo 
Serializer, if
+ * it has to ensure compatibility with one of those.
+ *
+ * <p>This serializer is there only as a means to explicitly fall back to PoJo 
serialization
+ * in the case where an upgrade from an earlier savepoint was made.
+ *
+ * @param <T> The type to be serialized.
+ */
+@SuppressWarnings("deprecation")
+public class BackwardsCompatibleAvroSerializer<T> extends TypeSerializer<T> {
+
+       private static final long serialVersionUID = 1L;
+
+       /** The type to serialize. */
+       private final Class<T> type;
+
+       /** The type serializer currently used. Avro by default. */
+       private TypeSerializer<T> serializer;
+
+       /**
+        * Creates a new backwards-compatible Avro Serializer, for the given 
type.
+        */
+       public BackwardsCompatibleAvroSerializer(Class<T> type) {
+               this.type = type;
+               this.serializer = new AvroSerializer<>(type);
+       }
+
+       /**
+        * Private copy constructor.
+        */
+       private BackwardsCompatibleAvroSerializer(Class<T> type, 
TypeSerializer<T> serializer) {
+               this.type = type;
+               this.serializer = serializer;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Properties
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public boolean isImmutableType() {
+               return serializer.isImmutableType();
+       }
+
+       @Override
+       public int getLength() {
+               return serializer.getLength();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Serialization
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public T createInstance() {
+               return serializer.createInstance();
+       }
+
+       @Override
+       public void serialize(T value, DataOutputView target) throws 
IOException {
+               serializer.serialize(value, target);
+       }
+
+       @Override
+       public T deserialize(DataInputView source) throws IOException {
+               return serializer.deserialize(source);
+       }
+
+       @Override
+       public T deserialize(T reuse, DataInputView source) throws IOException {
+               return serializer.deserialize(reuse, source);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Copying
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public T copy(T from) {
+               return serializer.copy(from);
+       }
+
+       @Override
+       public T copy(T from, T reuse) {
+               return serializer.copy(from, reuse);
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               serializer.copy(source, target);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public TypeSerializer<T> duplicate() {
+               return new BackwardsCompatibleAvroSerializer<>(type, 
serializer.duplicate());
+       }
+
+       @Override
+       public int hashCode() {
+               return type.hashCode();
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj == this) {
+                       return true;
+               }
+               else if (obj != null && obj.getClass() == 
BackwardsCompatibleAvroSerializer.class) {
+                       final BackwardsCompatibleAvroSerializer that = 
(BackwardsCompatibleAvroSerializer) obj;
+                       return this.type == that.type && 
this.serializer.equals(that.serializer);
+               }
+               else {
+                       return false;
+               }
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj.getClass() == this.getClass();
+       }
+
+       @Override
+       public String toString() {
+               return getClass().getName() + " (" + type.getName() + ')';
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Configuration Snapshots and Upgrades
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public TypeSerializerConfigSnapshot snapshotConfiguration() {
+               // we return the configuration of the actually used serializer 
here
+               return serializer.snapshotConfiguration();
+       }
+
+       @Override
+       public CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+               if (configSnapshot instanceof 
AvroSchemaSerializerConfigSnapshot ||
+                               configSnapshot instanceof 
AvroSerializerConfigSnapshot) {
+
+                       // avro serializer, nice :-)
+                       checkState(serializer instanceof AvroSerializer,
+                                       "Serializer was changed backwards to 
PojoSerializer and now encounters AvroSerializer snapshot.");
+
+                       return serializer.ensureCompatibility(configSnapshot);
+               }
+               else if (configSnapshot instanceof 
PojoSerializerConfigSnapshot) {
+                       // common previous case
+                       
checkState(SpecificRecordBase.class.isAssignableFrom(type),
+                                       "BackwardsCompatibleAvroSerializer 
resuming a state serialized " +
+                                                       "via a PojoSerializer, 
but not for an Avro Specific Record");
+
+                       final AvroTypeInfo<? extends SpecificRecordBase> 
typeInfo =
+                                       new 
AvroTypeInfo<>(type.asSubclass(SpecificRecordBase.class), true);
+
+                       @SuppressWarnings("unchecked")
+                       final TypeSerializer<T> pojoSerializer =
+                                       (TypeSerializer<T>) 
typeInfo.createPojoSerializer(new ExecutionConfig());
+                       this.serializer = pojoSerializer;
+                       return serializer.ensureCompatibility(configSnapshot);
+               }
+               else if (configSnapshot instanceof 
KryoRegistrationSerializerConfigSnapshot) {
+                       // force-kryo old case common previous case
+                       // we create a new Kryo Serializer with a blank 
execution config.
+                       // registrations are anyways picked up from the 
snapshot.
+                       serializer = new KryoSerializer<>(type, new 
ExecutionConfig());
+                       return serializer.ensureCompatibility(configSnapshot);
+               }
+               else {
+                       // completely incompatible type, needs migration
+                       return CompatibilityResult.requiresMigration();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerTest.java
new file mode 100644
index 0000000..0ab5868
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.formats.avro.utils.TestDataGenerator;
+
+import java.util.Random;
+
+/**
+ * Tests for the {@link AvroSerializer} that test specific avro types.
+ */
+public class AvroSerializerTest extends SerializerTestBase<User> {
+
+       @Override
+       protected TypeSerializer<User> createSerializer() {
+               return new AvroSerializer<>(User.class);
+       }
+
+       @Override
+       protected int getLength() {
+               return -1;
+       }
+
+       @Override
+       protected Class<User> getTypeClass() {
+               return User.class;
+       }
+
+       @Override
+       protected User[] getTestData() {
+               final Random rnd = new Random();
+               final User[] users = new User[20];
+
+               for (int i = 0; i < users.length; i++) {
+                       users[i] = TestDataGenerator.generateRandomUser(rnd);
+               }
+
+               return users;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
index ae41031..fbabb95 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
@@ -82,21 +82,14 @@ public class AvroTypeExtractionTest extends 
MultipleProgramsTestBase {
 
                AvroInputFormat<User> users = new AvroInputFormat<User>(in, 
User.class);
                DataSet<User> usersDS = env.createInput(users)
-                               // null map type because the order changes in 
different JVMs (hard to test)
-               .map(new MapFunction<User, User>() {
-                       @Override
-                       public User map(User value) throws Exception {
-                               value.setTypeMap(null);
-                               return value;
-                       }
-               });
+                               .map((value) -> value);
 
                usersDS.writeAsText(resultPath);
 
                env.execute("Simple Avro read job");
 
-               expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, 
\"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 
123.45, \"type_null_test\": null, \"type_bool_test\": true, 
\"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": 
[true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", 
\"type_map\": null, \"type_fixed\": null, \"type_union\": null, 
\"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": 
\"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" +
-                                       "{\"name\": \"Charlie\", 
\"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 
1337, \"type_double_test\": 1.337, \"type_null_test\": null, 
\"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": 
[], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": null, 
\"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, 
\"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", 
\"zip\": \"NW1 6XE\"}}\n";
+               expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, 
\"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 
123.45, \"type_null_test\": null, \"type_bool_test\": true, 
\"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": 
[true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", 
\"type_map\": {\"KEY 2\": 17554, \"KEY 1\": 8546456}, \"type_fixed\": null, 
\"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker 
Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 
6XE\"}}\n" +
+                                       "{\"name\": \"Charlie\", 
\"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 
1337, \"type_double_test\": 1.337, \"type_null_test\": null, 
\"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": 
[], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": {}, 
\"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, 
\"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", 
\"zip\": \"NW1 6XE\"}}\n";
        }
 
        @Test
@@ -107,7 +100,6 @@ public class AvroTypeExtractionTest extends 
MultipleProgramsTestBase {
 
                AvroInputFormat<User> users = new AvroInputFormat<User>(in, 
User.class);
                DataSet<User> usersDS = env.createInput(users)
-                               // null map type because the order changes in 
different JVMs (hard to test)
                                .map(new MapFunction<User, User>() {
                                        @Override
                                        public User map(User value) throws 
Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfoTest.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfoTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfoTest.java
index 79a4a45..371cd4f 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfoTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfoTest.java
@@ -18,10 +18,16 @@
 
 package org.apache.flink.formats.avro.typeutils;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeInformationTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.formats.avro.generated.Address;
 import org.apache.flink.formats.avro.generated.User;
 
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
 /**
  * Test for {@link AvroTypeInfo}.
  */
@@ -34,4 +40,10 @@ public class AvroTypeInfoTest extends 
TypeInformationTestBase<AvroTypeInfo<?>> {
                        new AvroTypeInfo<>(User.class),
                };
        }
+
+       @Test
+       public void testAvroByDefault() {
+               final TypeSerializer<User> serializer = new 
AvroTypeInfo<>(User.class).createSerializer(new ExecutionConfig());
+               assertTrue(serializer instanceof AvroSerializer);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
new file mode 100644
index 0000000..92395ba
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
+import 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.formats.avro.utils.TestDataGenerator;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This test ensures that state and state configuration created by Flink 1.3 
Avro types
+ * that used the PojoSerializer still works.
+ *
+ * <p><b>Important:</b> Since Avro itself broke class compatibility between 
1.7.7 (used in Flink 1.3)
+ * and 1.8.2 (used in Flink 1.4), the Avro by Pojo compatibility is broken 
through Avro already.
+ * This test only tests that the Avro serializer change (switching from Pojo 
to Avro for Avro types)
+ * works properly.
+ *
+ * <p>This test can be dropped once we drop backwards compatibility with Flink 
1.3 snapshots.
+ */
+public class BackwardsCompatibleAvroSerializerTest {
+
+       private static final String SNAPSHOT_RESOURCE = 
"flink-1.3-avro-type-serializer-snapshot";
+
+       private static final String DATA_RESOURCE = 
"flink-1.3-avro-type-serialized-data";
+
+       @SuppressWarnings("unused")
+       private static final String SNAPSHOT_RESOURCE_WRITER = 
"/data/repositories/flink/flink-formats/flink-avro/src/test/resources/" + 
SNAPSHOT_RESOURCE;
+
+       @SuppressWarnings("unused")
+       private static final String DATA_RESOURCE_WRITER = 
"/data/repositories/flink/flink-formats/flink-avro/src/test/resources/" + 
DATA_RESOURCE;
+
+       private static final long RANDOM_SEED = 143065108437678L;
+
+       private static final int NUM_DATA_ENTRIES = 20;
+
+       @Test
+       public void testCompatibilityWithFlink_1_3() throws Exception {
+
+               // retrieve the old config snapshot
+
+               final TypeSerializer<User> serializer;
+               final TypeSerializerConfigSnapshot configSnapshot;
+
+               try (InputStream in = 
getClass().getClassLoader().getResourceAsStream(SNAPSHOT_RESOURCE)) {
+                       DataInputViewStreamWrapper inView = new 
DataInputViewStreamWrapper(in);
+
+                       List<Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> deserialized =
+                                       
TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(
+                                                       inView, 
getClass().getClassLoader());
+
+                       assertEquals(1, deserialized.size());
+
+                       @SuppressWarnings("unchecked")
+                       final TypeSerializer<User> typedSerializer = 
(TypeSerializer<User>) deserialized.get(0).f0;
+
+                       serializer = typedSerializer;
+                       configSnapshot = deserialized.get(0).f1;
+               }
+
+               assertNotNull(serializer);
+               assertNotNull(configSnapshot);
+
+               assertTrue(serializer instanceof PojoSerializer);
+               assertTrue(configSnapshot instanceof 
PojoSerializerConfigSnapshot);
+
+               // sanity check for the test: check that the test data works 
with the original serializer
+               validateDeserialization(serializer);
+
+               // sanity check for the test: check that a PoJoSerializer and 
the original serializer work together
+               
assertFalse(serializer.ensureCompatibility(configSnapshot).isRequiresMigration());
+
+               final TypeSerializer<User> newSerializer = new 
AvroTypeInfo<>(User.class, true).createSerializer(new ExecutionConfig());
+               
assertFalse(newSerializer.ensureCompatibility(configSnapshot).isRequiresMigration());
+
+               // deserialize the data and make sure this still works
+               validateDeserialization(newSerializer);
+
+               TypeSerializerConfigSnapshot nextSnapshot = 
newSerializer.snapshotConfiguration();
+               final TypeSerializer<User> nextSerializer = new 
AvroTypeInfo<>(User.class, true).createSerializer(new ExecutionConfig());
+
+               
assertFalse(nextSerializer.ensureCompatibility(nextSnapshot).isRequiresMigration());
+
+               // deserialize the data and make sure this still works
+               validateDeserialization(nextSerializer);
+       }
+
+       private static void validateDeserialization(TypeSerializer<User> 
serializer) throws IOException {
+               final Random rnd = new Random(RANDOM_SEED);
+
+               try (InputStream in = 
BackwardsCompatibleAvroSerializerTest.class.getClassLoader()
+                               .getResourceAsStream(DATA_RESOURCE)) {
+
+                       final DataInputViewStreamWrapper inView = new 
DataInputViewStreamWrapper(in);
+
+                       for (int i = 0; i < NUM_DATA_ENTRIES; i++) {
+                               final User deserialized = 
serializer.deserialize(inView);
+
+                               // deterministically generate a reference record
+                               final User reference = 
TestDataGenerator.generateRandomUser(rnd);
+
+                               assertEquals(reference, deserialized);
+                       }
+               }
+       }
+
+// run this code on a 1.3 (or earlier) branch to generate the test data
+//     public static void main(String[] args) throws Exception {
+//
+//             AvroTypeInfo<User> typeInfo = new AvroTypeInfo<>(User.class);
+//
+//             TypeSerializer<User> serializer = 
typeInfo.createPojoSerializer(new ExecutionConfig());
+//             TypeSerializerConfigSnapshot confSnapshot = 
serializer.snapshotConfiguration();
+//
+//             try (FileOutputStream fos = new 
FileOutputStream(SNAPSHOT_RESOURCE_WRITER)) {
+//                     DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(fos);
+//
+//                     
TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
+//                                     out,
+//                                     Collections.singletonList(
+//                                                     new 
Tuple2<>(serializer, confSnapshot)));
+//             }
+//
+//             try (FileOutputStream fos = new 
FileOutputStream(DATA_RESOURCE_WRITER)) {
+//                     final DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(fos);
+//                     final Random rnd = new Random(RANDOM_SEED);
+//
+//                     for (int i = 0; i < NUM_DATA_ENTRIES; i++) {
+//                             
serializer.serialize(TestDataGenerator.generateRandomUser(rnd), out);
+//                     }
+//             }
+//     }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
new file mode 100644
index 0000000..9a9061e
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.utils;
+
+import org.apache.flink.formats.avro.generated.Address;
+import org.apache.flink.formats.avro.generated.Colors;
+import org.apache.flink.formats.avro.generated.Fixed16;
+import org.apache.flink.formats.avro.generated.User;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Generator for random test data for the generated Avro User type.
+ */
+public class TestDataGenerator {
+
+       public static User generateRandomUser(Random rnd) {
+               return new User(
+                               generateRandomString(rnd, 50),
+                               rnd.nextBoolean() ? null : rnd.nextInt(),
+                               rnd.nextBoolean() ? null : 
generateRandomString(rnd, 6),
+                               rnd.nextBoolean() ? null : rnd.nextLong(),
+                               rnd.nextDouble(),
+                               null,
+                               rnd.nextBoolean(),
+                               generateRandomStringList(rnd, 20, 30),
+                               generateRandomBooleanList(rnd, 20),
+                               rnd.nextBoolean() ? null : 
generateRandomStringList(rnd, 20, 20),
+                               generateRandomColor(rnd),
+                               new HashMap<>(),
+                               generateRandomFixed16(rnd),
+                               generateRandomUnion(rnd),
+                               generateRandomAddress(rnd));
+       }
+
+       public static Colors generateRandomColor(Random rnd) {
+               return Colors.values()[rnd.nextInt(Colors.values().length)];
+       }
+
+       public static Fixed16 generateRandomFixed16(Random rnd) {
+               if (rnd.nextBoolean()) {
+                       return new Fixed16();
+               }
+               else {
+                       byte[] bytes = new byte[16];
+                       rnd.nextBytes(bytes);
+                       return new Fixed16(bytes);
+               }
+       }
+
+       public static Address generateRandomAddress(Random rnd) {
+               return new Address(
+                               rnd.nextInt(),
+                               generateRandomString(rnd, 20),
+                               generateRandomString(rnd, 20),
+                               generateRandomString(rnd, 20),
+                               generateRandomString(rnd, 20));
+       }
+
+       private static List<Boolean> generateRandomBooleanList(Random rnd, int 
maxEntries) {
+               final int num = rnd.nextInt(maxEntries + 1);
+               ArrayList<Boolean> list = new ArrayList<>();
+               for (int i = 0; i < num; i++) {
+                       list.add(rnd.nextBoolean());
+               }
+               return list;
+       }
+
+       private static List<CharSequence> generateRandomStringList(Random rnd, 
int maxEntries, int maxLen) {
+               final int num = rnd.nextInt(maxEntries + 1);
+               ArrayList<CharSequence> list = new ArrayList<>();
+               for (int i = 0; i < num; i++) {
+                       list.add(generateRandomString(rnd, maxLen));
+               }
+               return list;
+       }
+
+       private static String generateRandomString(Random rnd, int maxLen) {
+               char[] chars = new char[rnd.nextInt(maxLen + 1)];
+               for (int i = 0; i < chars.length; i++) {
+                       chars[i] = (char) rnd.nextInt(Character.MAX_VALUE);
+               }
+               return new String(chars);
+       }
+
+       private static Object generateRandomUnion(Random rnd) {
+               if (rnd.nextBoolean()) {
+                       if (rnd.nextBoolean()) {
+                               return null;
+                       } else {
+                               return rnd.nextBoolean();
+                       }
+               } else {
+                       if (rnd.nextBoolean()) {
+                               return rnd.nextLong();
+                       } else {
+                               return rnd.nextDouble();
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data
 
b/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data
new file mode 100644
index 0000000..028c1e6
Binary files /dev/null and 
b/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data
 differ

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serializer-snapshot
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serializer-snapshot
 
b/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serializer-snapshot
new file mode 100644
index 0000000..5bfdf728
Binary files /dev/null and 
b/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serializer-snapshot
 differ

Reply via email to