http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
deleted file mode 100644
index 6f03b12..0000000
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.io.Encoder;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.specific.SpecificData;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.util.Utf8;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.List;
-
-/**
- * Serialization schema that serializes {@link Row} over {@link 
SpecificRecord} into a Avro bytes.
- */
-public class AvroRowSerializationSchema implements SerializationSchema<Row> {
-
-       /**
-        * Avro record class.
-        */
-       private Class<? extends SpecificRecord> recordClazz;
-
-       /**
-        * Avro serialization schema.
-        */
-       private transient Schema schema;
-
-       /**
-        * Writer to serialize Avro record into a byte array.
-        */
-       private transient DatumWriter<GenericRecord> datumWriter;
-
-       /**
-        * Output stream to serialize records into byte array.
-        */
-       private transient ByteArrayOutputStream arrayOutputStream = new 
ByteArrayOutputStream();
-
-       /**
-        * Low-level class for serialization of Avro values.
-        */
-       private transient Encoder encoder = 
EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
-
-       /**
-        * Creates a Avro serialization schema for the given schema.
-        *
-        * @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
-        */
-       public AvroRowSerializationSchema(Class<? extends SpecificRecord> 
recordClazz) {
-               Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
-               this.recordClazz = recordClazz;
-               this.schema = SpecificData.get().getSchema(recordClazz);
-               this.datumWriter = new SpecificDatumWriter<>(schema);
-       }
-
-       @Override
-       @SuppressWarnings("unchecked")
-       public byte[] serialize(Row row) {
-               // convert to record
-               final Object record = convertToRecord(schema, row);
-
-               // write
-               try {
-                       arrayOutputStream.reset();
-                       datumWriter.write((GenericRecord) record, encoder);
-                       encoder.flush();
-                       return arrayOutputStream.toByteArray();
-               } catch (IOException e) {
-                       throw new RuntimeException("Failed to serialize Row.", 
e);
-               }
-       }
-
-       private void writeObject(ObjectOutputStream oos) throws IOException {
-               oos.writeObject(recordClazz);
-       }
-
-       @SuppressWarnings("unchecked")
-       private void readObject(ObjectInputStream ois) throws 
ClassNotFoundException, IOException {
-               this.recordClazz = (Class<? extends SpecificRecord>) 
ois.readObject();
-               this.schema = SpecificData.get().getSchema(recordClazz);
-               this.datumWriter = new SpecificDatumWriter<>(schema);
-               this.arrayOutputStream = new ByteArrayOutputStream();
-               this.encoder = 
EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
-       }
-
-       /**
-        * Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
-        * Strings are converted into Avro's {@link Utf8} fields.
-        */
-       private static Object convertToRecord(Schema schema, Object rowObj) {
-               if (rowObj instanceof Row) {
-                       // records can be wrapped in a union
-                       if (schema.getType() == Schema.Type.UNION) {
-                               final List<Schema> types = schema.getTypes();
-                               if (types.size() == 2 && types.get(0).getType() 
== Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
-                                       schema = types.get(1);
-                               }
-                               else if (types.size() == 2 && 
types.get(0).getType() == Schema.Type.RECORD && types.get(1).getType() == 
Schema.Type.NULL) {
-                                       schema = types.get(0);
-                               }
-                               else {
-                                       throw new RuntimeException("Currently 
we only support schemas of the following form: UNION[null, RECORD] or 
UNION[RECORD, NULL] Given: " + schema);
-                               }
-                       } else if (schema.getType() != Schema.Type.RECORD) {
-                               throw new RuntimeException("Record type for row 
type expected. But is: " + schema);
-                       }
-                       final List<Schema.Field> fields = schema.getFields();
-                       final GenericRecord record = new 
GenericData.Record(schema);
-                       final Row row = (Row) rowObj;
-                       for (int i = 0; i < fields.size(); i++) {
-                               final Schema.Field field = fields.get(i);
-                               record.put(field.pos(), 
convertToRecord(field.schema(), row.getField(i)));
-                       }
-                       return record;
-               } else if (rowObj instanceof String) {
-                       return new Utf8((String) rowObj);
-               } else {
-                       return rowObj;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
deleted file mode 100644
index 28f2ed3..0000000
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.connectors.kafka.testutils.AvroTestUtils;
-import 
org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
-import 
org.apache.flink.streaming.util.serialization.AvroRowSerializationSchema;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.InstantiationUtil;
-
-import org.apache.avro.specific.SpecificRecord;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Test for the Avro serialization and deserialization schema.
- */
-public class AvroRowDeSerializationSchemaTest {
-
-       @Test
-       public void testSerializeDeserializeSimpleRow() throws IOException {
-               final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, 
Row> testData = AvroTestUtils.getSimpleTestData();
-
-               final AvroRowSerializationSchema serializationSchema = new 
AvroRowSerializationSchema(testData.f0);
-               final AvroRowDeserializationSchema deserializationSchema = new 
AvroRowDeserializationSchema(testData.f0);
-
-               final byte[] bytes = serializationSchema.serialize(testData.f2);
-               final Row actual = deserializationSchema.deserialize(bytes);
-
-               assertEquals(testData.f2, actual);
-       }
-
-       @Test
-       public void testSerializeSimpleRowSeveralTimes() throws IOException {
-               final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, 
Row> testData = AvroTestUtils.getSimpleTestData();
-
-               final AvroRowSerializationSchema serializationSchema = new 
AvroRowSerializationSchema(testData.f0);
-               final AvroRowDeserializationSchema deserializationSchema = new 
AvroRowDeserializationSchema(testData.f0);
-
-               serializationSchema.serialize(testData.f2);
-               serializationSchema.serialize(testData.f2);
-               final byte[] bytes = serializationSchema.serialize(testData.f2);
-               final Row actual = deserializationSchema.deserialize(bytes);
-
-               assertEquals(testData.f2, actual);
-       }
-
-       @Test
-       public void testDeserializeRowSeveralTimes() throws IOException {
-               final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, 
Row> testData = AvroTestUtils.getSimpleTestData();
-
-               final AvroRowSerializationSchema serializationSchema = new 
AvroRowSerializationSchema(testData.f0);
-               final AvroRowDeserializationSchema deserializationSchema = new 
AvroRowDeserializationSchema(testData.f0);
-
-               final byte[] bytes = serializationSchema.serialize(testData.f2);
-               deserializationSchema.deserialize(bytes);
-               deserializationSchema.deserialize(bytes);
-               final Row actual = deserializationSchema.deserialize(bytes);
-
-               assertEquals(testData.f2, actual);
-       }
-
-       @Test
-       public void testSerializeDeserializeComplexRow() throws IOException {
-               final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, 
Row> testData = AvroTestUtils.getComplexTestData();
-
-               final AvroRowSerializationSchema serializationSchema = new 
AvroRowSerializationSchema(testData.f0);
-               final AvroRowDeserializationSchema deserializationSchema = new 
AvroRowDeserializationSchema(testData.f0);
-
-               final byte[] bytes = serializationSchema.serialize(testData.f2);
-               final Row actual = deserializationSchema.deserialize(bytes);
-
-               assertEquals(testData.f2, actual);
-       }
-
-       @Test
-       public void testSerializeComplexRowSeveralTimes() throws IOException {
-               final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, 
Row> testData = AvroTestUtils.getComplexTestData();
-
-               final AvroRowSerializationSchema serializationSchema = new 
AvroRowSerializationSchema(testData.f0);
-               final AvroRowDeserializationSchema deserializationSchema = new 
AvroRowDeserializationSchema(testData.f0);
-
-               serializationSchema.serialize(testData.f2);
-               serializationSchema.serialize(testData.f2);
-               final byte[] bytes = serializationSchema.serialize(testData.f2);
-               final Row actual = deserializationSchema.deserialize(bytes);
-
-               assertEquals(testData.f2, actual);
-       }
-
-       @Test
-       public void testDeserializeComplexRowSeveralTimes() throws IOException {
-               final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, 
Row> testData = AvroTestUtils.getComplexTestData();
-
-               final AvroRowSerializationSchema serializationSchema = new 
AvroRowSerializationSchema(testData.f0);
-               final AvroRowDeserializationSchema deserializationSchema = new 
AvroRowDeserializationSchema(testData.f0);
-
-               final byte[] bytes = serializationSchema.serialize(testData.f2);
-               deserializationSchema.deserialize(bytes);
-               deserializationSchema.deserialize(bytes);
-               final Row actual = deserializationSchema.deserialize(bytes);
-
-               assertEquals(testData.f2, actual);
-       }
-
-       @Test
-       public void testSerializability() throws IOException, 
ClassNotFoundException {
-               final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, 
Row> testData = AvroTestUtils.getComplexTestData();
-
-               final AvroRowSerializationSchema serOrig = new 
AvroRowSerializationSchema(testData.f0);
-               final AvroRowDeserializationSchema deserOrig = new 
AvroRowDeserializationSchema(testData.f0);
-
-               byte[] serBytes = InstantiationUtil.serializeObject(serOrig);
-               byte[] deserBytes = 
InstantiationUtil.serializeObject(deserOrig);
-
-               AvroRowSerializationSchema serCopy =
-                       InstantiationUtil.deserializeObject(serBytes, 
Thread.currentThread().getContextClassLoader());
-               AvroRowDeserializationSchema deserCopy =
-                       InstantiationUtil.deserializeObject(deserBytes, 
Thread.currentThread().getContextClassLoader());
-
-               final byte[] bytes = serCopy.serialize(testData.f2);
-               deserCopy.deserialize(bytes);
-               deserCopy.deserialize(bytes);
-               final Row actual = deserCopy.deserialize(bytes);
-
-               assertEquals(testData.f2, actual);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
index def16b2..871a6f6 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.streaming.connectors.kafka.testutils.AvroTestUtils;
+import org.apache.flink.formats.avro.utils.AvroTestUtils;
 import org.apache.flink.table.api.Types;
 
 import org.apache.avro.Schema;

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/AvroTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/AvroTestUtils.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/AvroTestUtils.java
deleted file mode 100644
index a41125a..0000000
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/AvroTestUtils.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.io.avro.generated.Address;
-import org.apache.flink.api.io.avro.generated.Colors;
-import org.apache.flink.api.io.avro.generated.User;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.types.Row;
-
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaBuilder;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.specific.SpecificRecord;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-
-/**
- * Utilities for creating Avro Schemas.
- */
-public final class AvroTestUtils {
-
-       private static final String NAMESPACE = 
"org.apache.flink.streaming.connectors.kafka";
-
-       /**
-        * Creates a flat Avro Schema for testing.
-        */
-       public static Schema createFlatAvroSchema(String[] fieldNames, 
TypeInformation[] fieldTypes) {
-               final SchemaBuilder.FieldAssembler<Schema> fieldAssembler = 
SchemaBuilder
-                       .record("BasicAvroRecord")
-                       .namespace(NAMESPACE)
-                       .fields();
-
-               final Schema nullSchema = Schema.create(Schema.Type.NULL);
-
-               for (int i = 0; i < fieldNames.length; i++) {
-                       Schema schema = 
ReflectData.get().getSchema(fieldTypes[i].getTypeClass());
-                       Schema unionSchema = 
Schema.createUnion(Arrays.asList(nullSchema, schema));
-                       
fieldAssembler.name(fieldNames[i]).type(unionSchema).noDefault();
-               }
-
-               return fieldAssembler.endRecord();
-       }
-
-       /**
-        * Tests a simple Avro data types without nesting.
-        */
-       public static Tuple3<Class<? extends SpecificRecord>, SpecificRecord, 
Row> getSimpleTestData() {
-               final Address addr = Address.newBuilder()
-                       .setNum(42)
-                       .setStreet("Main Street 42")
-                       .setCity("Test City")
-                       .setState("Test State")
-                       .setZip("12345")
-                       .build();
-
-               final Row rowAddr = new Row(5);
-               rowAddr.setField(0, 42);
-               rowAddr.setField(1, "Main Street 42");
-               rowAddr.setField(2, "Test City");
-               rowAddr.setField(3, "Test State");
-               rowAddr.setField(4, "12345");
-
-               final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, 
Row> t = new Tuple3<>();
-               t.f0 = Address.class;
-               t.f1 = addr;
-               t.f2 = rowAddr;
-
-               return t;
-       }
-
-       /**
-        * Tests all Avro data types as well as nested types.
-        */
-       public static Tuple3<Class<? extends SpecificRecord>, SpecificRecord, 
Row> getComplexTestData() {
-               final Address addr = Address.newBuilder()
-                       .setNum(42)
-                       .setStreet("Main Street 42")
-                       .setCity("Test City")
-                       .setState("Test State")
-                       .setZip("12345")
-                       .build();
-
-               final Row rowAddr = new Row(5);
-               rowAddr.setField(0, 42);
-               rowAddr.setField(1, "Main Street 42");
-               rowAddr.setField(2, "Test City");
-               rowAddr.setField(3, "Test State");
-               rowAddr.setField(4, "12345");
-
-               final User user = User.newBuilder()
-                       .setName("Charlie")
-                       .setFavoriteNumber(null)
-                       .setFavoriteColor("blue")
-                       .setTypeLongTest(1337L)
-                       .setTypeDoubleTest(1.337d)
-                       .setTypeNullTest(null)
-                       .setTypeBoolTest(false)
-                       .setTypeArrayString(new ArrayList<CharSequence>())
-                       .setTypeArrayBoolean(new ArrayList<Boolean>())
-                       .setTypeNullableArray(null)
-                       .setTypeEnum(Colors.RED)
-                       .setTypeMap(new HashMap<CharSequence, Long>())
-                       .setTypeFixed(null)
-                       .setTypeUnion(null)
-                       .setTypeNested(addr)
-                       .build();
-
-               final Row rowUser = new Row(15);
-               rowUser.setField(0, "Charlie");
-               rowUser.setField(1, null);
-               rowUser.setField(2, "blue");
-               rowUser.setField(3, 1337L);
-               rowUser.setField(4, 1.337d);
-               rowUser.setField(5, null);
-               rowUser.setField(6, false);
-               rowUser.setField(7, new ArrayList<CharSequence>());
-               rowUser.setField(8, new ArrayList<Boolean>());
-               rowUser.setField(9, null);
-               rowUser.setField(10, Colors.RED);
-               rowUser.setField(11, new HashMap<CharSequence, Long>());
-               rowUser.setField(12, null);
-               rowUser.setField(13, null);
-               rowUser.setField(14, rowAddr);
-
-               final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, 
Row> t = new Tuple3<>();
-               t.f0 = User.class;
-               t.f1 = user;
-               t.f2 = rowUser;
-
-               return t;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index 97c9f20..7468b67 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -36,7 +36,6 @@ under the License.
        <packaging>pom</packaging>
 
        <modules>
-               <module>flink-avro</module>
                <module>flink-jdbc</module>
                <module>flink-hadoop-compatibility</module>
                <module>flink-hbase</module>

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index ae3f56e..0ca742c 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -80,12 +80,6 @@ under the License.
                        <!-- managed version -->
                </dependency>
 
-               <!-- Avro is needed for the interoperability with Avro types 
for serialization -->
-               <dependency>
-                       <groupId>org.apache.avro</groupId>
-                       <artifactId>avro</artifactId>
-               </dependency>
-
                <!-- We explicitly depend on snappy since connectors that 
require it load it through the system class loader -->
                <dependency>
                        <groupId>org.xerial.snappy</groupId>
@@ -128,7 +122,7 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
-    </dependencies>
+       </dependencies>
 
        <profiles>
                <profile>
@@ -209,6 +203,7 @@ under the License.
                                                        
<exclude>org.apache.flink.core.fs.FileSystem#isFlinkSupportedScheme(java.lang.String)</exclude>
                                                        
<exclude>org.apache.flink.core.fs.FileSystem#setDefaultScheme(org.apache.flink.configuration.Configuration)</exclude>
                                                        
<exclude>org.apache.flink.api.java.typeutils.WritableTypeInfo</exclude>
+                                                       
<exclude>org.apache.flink.api.java.typeutils.AvroTypeInfo</exclude>
                                                        <!-- Breaking changes 
between 1.1 and 1.2.
                                                        We ignore these changes 
because these are low-level, internal runtime configuration parameters -->
                                                        
<exclude>org.apache.flink.configuration.ConfigConstants#DEFAULT_JOB_MANAGER_MAX_ATTEMPTS_HISTORY_SIZE</exclude>

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index fc66ccd..88d524e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -571,16 +571,24 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
        }
 
        /**
-        * Force Flink to use the AvroSerializer for POJOs.
+        * Forces Flink to use the Apache Avro serializer for POJOs.
+        *
+        * <b>Important:</b> Make sure to include the <i>flink-avro</i> module.
         */
        public void enableForceAvro() {
                forceAvro = true;
        }
 
+       /**
+        * Disables the Apache Avro serializer as the forced serializer for 
POJOs.
+        */
        public void disableForceAvro() {
                forceAvro = false;
        }
 
+       /**
+        * Returns whether the Apache Avro is the default serializer for POJOs.
+        */
        public boolean isForceAvroEnabled() {
                return forceAvro;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
deleted file mode 100644
index 1356e53..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.typeutils;
-
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.annotation.Public;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Special type information to generate a special AvroTypeInfo for Avro POJOs 
(implementing SpecificRecordBase, the typed Avro POJOs)
- *
- * Proceeding: It uses a regular pojo type analysis and replaces all {@code 
GenericType<CharSequence>}
- *     with a {@code GenericType<avro.Utf8>}.
- * All other types used by Avro are standard Java types.
- * Only strings are represented as CharSequence fields and represented as Utf8 
classes at runtime.
- * CharSequence is not comparable. To make them nicely usable with field 
expressions, we replace them here
- * by generic type infos containing Utf8 classes (which are comparable),
- *
- * This class is checked by the AvroPojoTest.
- * @param <T>
- */
-@Public
-public class AvroTypeInfo<T extends SpecificRecordBase> extends 
PojoTypeInfo<T> {
-       @PublicEvolving
-       public AvroTypeInfo(Class<T> typeClass) {
-               super(typeClass, generateFieldsFromAvroSchema(typeClass));
-       }
-
-       private static <T extends SpecificRecordBase> List<PojoField> 
generateFieldsFromAvroSchema(Class<T> typeClass) {
-               PojoTypeExtractor pte = new PojoTypeExtractor();
-               ArrayList<Type> typeHierarchy = new ArrayList<>();
-               typeHierarchy.add(typeClass);
-               TypeInformation ti = pte.analyzePojo(typeClass, typeHierarchy, 
null, null, null);
-
-               if(!(ti instanceof PojoTypeInfo)) {
-                       throw new IllegalStateException("Expecting type to be a 
PojoTypeInfo");
-               }
-               PojoTypeInfo pti =  (PojoTypeInfo) ti;
-               List<PojoField> newFields = new 
ArrayList<>(pti.getTotalFields());
-
-               for(int i = 0; i < pti.getArity(); i++) {
-                       PojoField f = pti.getPojoFieldAt(i);
-                       TypeInformation newType = f.getTypeInformation();
-                       // check if type is a CharSequence
-                       if(newType instanceof GenericTypeInfo) {
-                               
if((newType).getTypeClass().equals(CharSequence.class)) {
-                                       // replace the type by a 
org.apache.avro.util.Utf8
-                                       newType = new 
GenericTypeInfo(org.apache.avro.util.Utf8.class);
-                               }
-                       }
-                       PojoField newField = new PojoField(f.getField(), 
newType);
-                       newFields.add(newField);
-               }
-               return newFields;
-       }
-
-       private static class PojoTypeExtractor extends TypeExtractor {
-               private PojoTypeExtractor() {
-                       super();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
index 8a4fbbe..b24f425 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -27,12 +27,13 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
 import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
 import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 
+import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Modifier;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -300,15 +301,32 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
 
        @Override
        @PublicEvolving
+       @SuppressWarnings("unchecked")
        public TypeSerializer<T> createSerializer(ExecutionConfig config) {
                if(config.isForceKryoEnabled()) {
-                       return new KryoSerializer<T>(getTypeClass(), config);
+                       return new KryoSerializer<>(getTypeClass(), config);
                }
+
                if(config.isForceAvroEnabled()) {
-                       return new AvroSerializer<T>(getTypeClass());
+                       Class<?> clazz;
+                       try {
+                               clazz = 
Class.forName("org.apache.flink.formats.avro.typeutils.AvroSerializer");
+                       } catch (ClassNotFoundException e) {
+                               throw new RuntimeException("Could not load the 
AvroSerializer class. " +
+                                       "You may be missing the 'flink-avro' 
dependency.");
+                       }
+
+                       try {
+                               Constructor<?> constructor = 
clazz.getConstructor(Class.class);
+                               return (TypeSerializer<T>) 
constructor.newInstance(getTypeClass());
+                       } catch (NoSuchMethodException | IllegalAccessException 
| InstantiationException e) {
+                               throw new RuntimeException("Incompatible 
versions of the Avro classes found.");
+                       } catch (InvocationTargetException e) {
+                               throw new RuntimeException("Cannot create 
AvroSerializer.", e.getTargetException());
+                       }
                }
 
-               TypeSerializer<?>[] fieldSerializers = new 
TypeSerializer<?>[fields.length ];
+               TypeSerializer<?>[] fieldSerializers = new 
TypeSerializer<?>[fields.length];
                Field[] reflectiveFields = new Field[fields.length];
 
                for (int i = 0; i < fields.length; i++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
index 41d260d..c5c2565 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
@@ -287,4 +287,39 @@ public class TypeExtractionUtils {
                        ((TypeVariable<?>) 
t1).getName().equals(((TypeVariable<?>) t2).getName()) &&
                        ((TypeVariable<?>) 
t1).getGenericDeclaration().equals(((TypeVariable<?>) 
t2).getGenericDeclaration());
        }
+
+       /**
+        * Traverses the type hierarchy of a type up until a certain stop class 
is found.
+        *
+        * @param t type for which a hierarchy need to be created
+        * @return type of the immediate child of the stop class
+        */
+       public static Type getTypeHierarchy(List<Type> typeHierarchy, Type t, 
Class<?> stopAtClass) {
+               while (!(isClassType(t) && typeToClass(t).equals(stopAtClass))) 
{
+                       typeHierarchy.add(t);
+                       t = typeToClass(t).getGenericSuperclass();
+
+                       if (t == null) {
+                               break;
+                       }
+               }
+               return t;
+       }
+
+       /**
+        * Returns true if the given class has a superclass of given name.
+        *
+        * @param clazz class to be analyzed
+        * @param superClassName class name of the super class
+        */
+       public static boolean hasSuperclass(Class<?> clazz, String 
superClassName) {
+               List<Type> hierarchy = new ArrayList<>();
+               getTypeHierarchy(hierarchy, clazz, Object.class);
+               for (Type t : hierarchy) {
+                       if (isClassType(t) && 
typeToClass(t).getName().equals(superClassName)) {
+                               return true;
+                       }
+               }
+               return false;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index c50dfc9..1a9cecb 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.api.java.typeutils;
 
-import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.commons.lang3.ClassUtils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
@@ -73,6 +72,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.flink.api.java.typeutils.TypeExtractionUtils.getTypeHierarchy;
+import static 
org.apache.flink.api.java.typeutils.TypeExtractionUtils.hasSuperclass;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static 
org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda;
 import static 
org.apache.flink.api.java.typeutils.TypeExtractionUtils.getAllDeclaredMethods;
@@ -114,6 +115,10 @@ public class TypeExtractor {
 
        private static final String HADOOP_WRITABLE_TYPEINFO_CLASS = 
"org.apache.flink.api.java.typeutils.WritableTypeInfo";
 
+       private static final String AVRO_SPECIFIC_RECORD_BASE_CLASS = 
"org.apache.avro.specific.SpecificRecordBase";
+
+       private static final String AVRO_TYPEINFO_CLASS = 
"org.apache.flink.formats.avro.typeutils.AvroTypeInfo";
+
        private static final Logger LOG = 
LoggerFactory.getLogger(TypeExtractor.class);
 
        public static final int[] NO_INDEX = new int[] {};
@@ -1583,24 +1588,6 @@ public class TypeExtractor {
        }
 
        /**
-        * Traverses the type hierarchy of a type up until a certain stop class 
is found.
-        *
-        * @param t type for which a hierarchy need to be created
-        * @return type of the immediate child of the stop class
-        */
-       private static Type getTypeHierarchy(ArrayList<Type> typeHierarchy, 
Type t, Class<?> stopAtClass) {
-               while (!(isClassType(t) && typeToClass(t).equals(stopAtClass))) 
{
-                       typeHierarchy.add(t);
-                       t = typeToClass(t).getGenericSuperclass();
-
-                       if (t == null) {
-                               break;
-                       }
-               }
-               return t;
-       }
-
-       /**
         * Traverses the type hierarchy up until a type information factory can 
be found.
         *
         * @param typeHierarchy hierarchy to be filled while traversing up
@@ -1806,8 +1793,8 @@ public class TypeExtractor {
                }
 
                // special case for POJOs generated by Avro.
-               if(SpecificRecordBase.class.isAssignableFrom(clazz)) {
-                       return new AvroTypeInfo(clazz);
+               if (hasSuperclass(clazz, AVRO_SPECIFIC_RECORD_BASE_CLASS)) {
+                       return createAvroTypeInfo(clazz);
                }
 
                if (Modifier.isInterface(clazz.getModifiers())) {
@@ -2119,7 +2106,7 @@ public class TypeExtractor {
        private static boolean hasHadoopWritableInterface(Class<?> clazz,  
HashSet<Class<?>> alreadySeen) {
                Class<?>[] interfaces = clazz.getInterfaces();
                for (Class<?> c : interfaces) {
-                       if 
(c.getName().equals("org.apache.hadoop.io.Writable")) {
+                       if (c.getName().equals(HADOOP_WRITABLE_CLASS)) {
                                return true;
                        }
                        else if (alreadySeen.add(c) && 
hasHadoopWritableInterface(c, alreadySeen)) {
@@ -2155,7 +2142,7 @@ public class TypeExtractor {
                        throw new RuntimeException("Incompatible versions of 
the Hadoop Compatibility classes found.");
                }
                catch (InvocationTargetException e) {
-                       throw new RuntimeException("Cannot create Hadoop 
Writable Type info", e.getTargetException());
+                       throw new RuntimeException("Cannot create Hadoop 
WritableTypeInfo.", e.getTargetException());
                }
        }
 
@@ -2171,7 +2158,7 @@ public class TypeExtractor {
                                // this is actually a writable type info
                                // check if the type is a writable
                                if (!(type instanceof Class && 
isHadoopWritable((Class<?>) type))) {
-                                       throw new 
InvalidTypesException(HADOOP_WRITABLE_CLASS + " type expected");
+                                       throw new 
InvalidTypesException(HADOOP_WRITABLE_CLASS + " type expected.");
                                }
 
                                // check writable type contents
@@ -2188,4 +2175,33 @@ public class TypeExtractor {
                        // ignore
                }
        }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities to handle Avro's 'SpecificRecord' type via reflection
+       // 
------------------------------------------------------------------------
+
+       private static <T> TypeInformation<T> createAvroTypeInfo(Class<T> 
clazz) {
+               Class<?> typeInfoClass;
+               try {
+                       typeInfoClass = Class.forName(AVRO_TYPEINFO_CLASS, 
false, TypeExtractor.class.getClassLoader());
+               }
+               catch (ClassNotFoundException e) {
+                       throw new RuntimeException("Could not load the 
TypeInformation for the class '"
+                                       + AVRO_TYPEINFO_CLASS + "'. You may be 
missing the 'flink-avro' dependency.");
+               }
+
+               try {
+                       Constructor<?> constr = 
typeInfoClass.getConstructor(Class.class);
+
+                       @SuppressWarnings("unchecked")
+                       TypeInformation<T> typeInfo = (TypeInformation<T>) 
constr.newInstance(clazz);
+                       return typeInfo;
+               }
+               catch (NoSuchMethodException | IllegalAccessException | 
InstantiationException e) {
+                       throw new RuntimeException("Incompatible versions of 
the Avro classes found.");
+               }
+               catch (InvocationTargetException e) {
+                       throw new RuntimeException("Cannot create 
AvroTypeInfo.", e.getTargetException());
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
deleted file mode 100644
index 565bd4d..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.avro.util.Utf8;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.InstantiationUtil;
-
-import com.esotericsoftware.kryo.Kryo;
-import org.apache.flink.util.Preconditions;
-import org.objenesis.strategy.StdInstantiatorStrategy;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * General purpose serialization. Currently using Apache Avro's 
Reflect-serializers for serialization and
- * Kryo for deep object copies. We want to change this to Kryo-only.
- *
- * @param <T> The type serialized.
- */
-@Internal
-public final class AvroSerializer<T> extends TypeSerializer<T> {
-
-       private static final long serialVersionUID = 1L;
-       
-       private final Class<T> type;
-       
-       private final Class<? extends T> typeToInstantiate;
-
-       /**
-        * Map of class tag (using classname as tag) to their Kryo registration.
-        *
-        * <p>This map serves as a preview of the final registration result of
-        * the Kryo instance, taking into account registration overwrites.
-        */
-       private LinkedHashMap<String, KryoRegistration> kryoRegistrations;
-       
-       private transient ReflectDatumWriter<T> writer;
-       private transient ReflectDatumReader<T> reader;
-       
-       private transient DataOutputEncoder encoder;
-       private transient DataInputDecoder decoder;
-       
-       private transient Kryo kryo;
-       
-       private transient T deepCopyInstance;
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       public AvroSerializer(Class<T> type) {
-               this(type, type);
-       }
-       
-       public AvroSerializer(Class<T> type, Class<? extends T> 
typeToInstantiate) {
-               this.type = checkNotNull(type);
-               this.typeToInstantiate = checkNotNull(typeToInstantiate);
-               
-               InstantiationUtil.checkForInstantiation(typeToInstantiate);
-
-               this.kryoRegistrations = buildKryoRegistrations(type);
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public boolean isImmutableType() {
-               return false;
-       }
-
-       @Override
-       public AvroSerializer<T> duplicate() {
-               return new AvroSerializer<T>(type, typeToInstantiate);
-       }
-       
-       @Override
-       public T createInstance() {
-               return InstantiationUtil.instantiate(this.typeToInstantiate);
-       }
-
-       @Override
-       public T copy(T from) {
-               checkKryoInitialized();
-
-               return KryoUtils.copy(from, kryo, this);
-       }
-       
-       @Override
-       public T copy(T from, T reuse) {
-               checkKryoInitialized();
-
-               return KryoUtils.copy(from, reuse, kryo, this);
-       }
-
-       @Override
-       public int getLength() {
-               return -1;
-       }
-
-       @Override
-       public void serialize(T value, DataOutputView target) throws 
IOException {
-               checkAvroInitialized();
-               this.encoder.setOut(target);
-               this.writer.write(value, this.encoder);
-       }
-       
-       @Override
-       public T deserialize(DataInputView source) throws IOException {
-               checkAvroInitialized();
-               this.decoder.setIn(source);
-               return this.reader.read(null, this.decoder);
-       }
-
-       @Override
-       public T deserialize(T reuse, DataInputView source) throws IOException {
-               checkAvroInitialized();
-               this.decoder.setIn(source);
-               return this.reader.read(reuse, this.decoder);
-       }
-
-       @Override
-       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-               checkAvroInitialized();
-               
-               if (this.deepCopyInstance == null) {
-                       this.deepCopyInstance = 
InstantiationUtil.instantiate(type, Object.class);
-               }
-               
-               this.decoder.setIn(source);
-               this.encoder.setOut(target);
-               
-               T tmp = this.reader.read(this.deepCopyInstance, this.decoder);
-               this.writer.write(tmp, this.encoder);
-       }
-       
-       
-       private void checkAvroInitialized() {
-               if (this.reader == null) {
-                       this.reader = new ReflectDatumReader<T>(type);
-                       this.writer = new ReflectDatumWriter<T>(type);
-                       this.encoder = new DataOutputEncoder();
-                       this.decoder = new DataInputDecoder();
-               }
-       }
-       
-       private void checkKryoInitialized() {
-               if (this.kryo == null) {
-                       this.kryo = new Kryo();
-
-                       Kryo.DefaultInstantiatorStrategy instantiatorStrategy = 
new Kryo.DefaultInstantiatorStrategy();
-                       
instantiatorStrategy.setFallbackInstantiatorStrategy(new 
StdInstantiatorStrategy());
-                       kryo.setInstantiatorStrategy(instantiatorStrategy);
-
-                       kryo.setAsmEnabled(true);
-
-                       KryoUtils.applyRegistrations(kryo, 
kryoRegistrations.values());
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public int hashCode() {
-               return 31 * this.type.hashCode() + 
this.typeToInstantiate.hashCode();
-       }
-       
-       @Override
-       public boolean equals(Object obj) {
-               if (obj instanceof AvroSerializer) {
-                       @SuppressWarnings("unchecked")
-                       AvroSerializer<T> avroSerializer = (AvroSerializer<T>) 
obj;
-
-                       return avroSerializer.canEqual(this) &&
-                               type == avroSerializer.type &&
-                               typeToInstantiate == 
avroSerializer.typeToInstantiate;
-               } else {
-                       return false;
-               }
-       }
-
-       @Override
-       public boolean canEqual(Object obj) {
-               return obj instanceof AvroSerializer;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // Serializer configuration snapshotting & compatibility
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public AvroSerializerConfigSnapshot<T> snapshotConfiguration() {
-               return new AvroSerializerConfigSnapshot<>(type, 
typeToInstantiate, kryoRegistrations);
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-               if (configSnapshot instanceof AvroSerializerConfigSnapshot) {
-                       final AvroSerializerConfigSnapshot<T> config = 
(AvroSerializerConfigSnapshot<T>) configSnapshot;
-
-                       if (type.equals(config.getTypeClass()) && 
typeToInstantiate.equals(config.getTypeToInstantiate())) {
-                               // resolve Kryo registrations; currently, since 
the Kryo registrations in Avro
-                               // are fixed, there shouldn't be a problem with 
the resolution here.
-
-                               LinkedHashMap<String, KryoRegistration> 
oldRegistrations = config.getKryoRegistrations();
-                               oldRegistrations.putAll(kryoRegistrations);
-
-                               for (Map.Entry<String, KryoRegistration> 
reconfiguredRegistrationEntry : kryoRegistrations.entrySet()) {
-                                       if 
(reconfiguredRegistrationEntry.getValue().isDummy()) {
-                                               return 
CompatibilityResult.requiresMigration();
-                                       }
-                               }
-
-                               this.kryoRegistrations = oldRegistrations;
-                               return CompatibilityResult.compatible();
-                       }
-               }
-
-               // ends up here if the preceding serializer is not
-               // the ValueSerializer, or serialized data type has changed
-               return CompatibilityResult.requiresMigration();
-       }
-
-       public static class AvroSerializerConfigSnapshot<T> extends 
KryoRegistrationSerializerConfigSnapshot<T> {
-
-               private static final int VERSION = 1;
-
-               private Class<? extends T> typeToInstantiate;
-
-               public AvroSerializerConfigSnapshot() {}
-
-               public AvroSerializerConfigSnapshot(
-                               Class<T> baseType,
-                               Class<? extends T> typeToInstantiate,
-                               LinkedHashMap<String, KryoRegistration> 
kryoRegistrations) {
-
-                       super(baseType, kryoRegistrations);
-                       this.typeToInstantiate = 
Preconditions.checkNotNull(typeToInstantiate);
-               }
-
-               @Override
-               public void write(DataOutputView out) throws IOException {
-                       super.write(out);
-
-                       out.writeUTF(typeToInstantiate.getName());
-               }
-
-               @SuppressWarnings("unchecked")
-               @Override
-               public void read(DataInputView in) throws IOException {
-                       super.read(in);
-
-                       String classname = in.readUTF();
-                       try {
-                               typeToInstantiate = (Class<? extends T>) 
Class.forName(classname, true, getUserCodeClassLoader());
-                       } catch (ClassNotFoundException e) {
-                               throw new IOException("Cannot find requested 
class " + classname + " in classpath.", e);
-                       }
-               }
-
-               @Override
-               public int getVersion() {
-                       return VERSION;
-               }
-
-               public Class<? extends T> getTypeToInstantiate() {
-                       return typeToInstantiate;
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
-       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-               in.defaultReadObject();
-
-               // kryoRegistrations may be null if this Avro serializer is 
deserialized from an old version
-               if (kryoRegistrations == null) {
-                       this.kryoRegistrations = buildKryoRegistrations(type);
-               }
-       }
-
-       private static <T> LinkedHashMap<String, KryoRegistration> 
buildKryoRegistrations(Class<T> serializedDataType) {
-               final LinkedHashMap<String, KryoRegistration> registrations = 
new LinkedHashMap<>();
-
-               // register Avro types.
-               registrations.put(
-                               GenericData.Array.class.getName(),
-                               new KryoRegistration(
-                                               GenericData.Array.class,
-                                               new 
ExecutionConfig.SerializableSerializer<>(new 
Serializers.SpecificInstanceCollectionSerializerForArrayList())));
-               registrations.put(Utf8.class.getName(), new 
KryoRegistration(Utf8.class));
-               registrations.put(GenericData.EnumSymbol.class.getName(), new 
KryoRegistration(GenericData.EnumSymbol.class));
-               registrations.put(GenericData.Fixed.class.getName(), new 
KryoRegistration(GenericData.Fixed.class));
-               registrations.put(GenericData.StringType.class.getName(), new 
KryoRegistration(GenericData.StringType.class));
-
-               // register the serialized data type
-               registrations.put(serializedDataType.getName(), new 
KryoRegistration(serializedDataType));
-
-               return registrations;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputDecoder.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputDecoder.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputDecoder.java
deleted file mode 100644
index c0454c6..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputDecoder.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.avro.io.Decoder;
-import org.apache.avro.util.Utf8;
-import org.apache.flink.annotation.Internal;
-
-@Internal
-public class DataInputDecoder extends Decoder implements java.io.Serializable {
-       
-       private static final long serialVersionUID = 1L;
-       
-       private transient Utf8 stringDecoder = new Utf8();
-       
-       
-       private transient DataInput in;
-       
-       
-       public void setIn(DataInput in) {
-               this.in = in;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // primitives
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public void readNull() {}
-       
-
-       @Override
-       public boolean readBoolean() throws IOException {
-               return in.readBoolean();
-       }
-
-       @Override
-       public int readInt() throws IOException {
-               return in.readInt();
-       }
-
-       @Override
-       public long readLong() throws IOException {
-               return in.readLong();
-       }
-
-       @Override
-       public float readFloat() throws IOException {
-               return in.readFloat();
-       }
-
-       @Override
-       public double readDouble() throws IOException {
-               return in.readDouble();
-       }
-       
-       @Override
-       public int readEnum() throws IOException {
-               return readInt();
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // bytes
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public void readFixed(byte[] bytes, int start, int length) throws 
IOException {
-               in.readFully(bytes, start, length);
-       }
-       
-       @Override
-       public ByteBuffer readBytes(ByteBuffer old) throws IOException {
-               int length = readInt();
-               ByteBuffer result;
-               if (old != null && length <= old.capacity() && old.hasArray()) {
-                       result = old;
-                       result.clear();
-               } else {
-                       result = ByteBuffer.allocate(length);
-               }
-               in.readFully(result.array(), result.arrayOffset() + 
result.position(), length);
-               result.limit(length);
-               return result;
-       }
-       
-       
-       @Override
-       public void skipFixed(int length) throws IOException {
-               skipBytes(length);
-       }
-       
-       @Override
-       public void skipBytes() throws IOException {
-               int num = readInt();
-               skipBytes(num);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // strings
-       // 
--------------------------------------------------------------------------------------------
-       
-       
-       @Override
-       public Utf8 readString(Utf8 old) throws IOException {
-               int length = readInt();
-               Utf8 result = (old != null ? old : new Utf8());
-               result.setByteLength(length);
-               
-               if (length > 0) {
-                       in.readFully(result.getBytes(), 0, length);
-               }
-               
-               return result;
-       }
-
-       @Override
-       public String readString() throws IOException {
-               return readString(stringDecoder).toString();
-       }
-
-       @Override
-       public void skipString() throws IOException {
-               int len = readInt();
-               skipBytes(len);
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // collection types
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public long readArrayStart() throws IOException {
-               return readVarLongCount(in);
-       }
-
-       @Override
-       public long arrayNext() throws IOException {
-               return readVarLongCount(in);
-       }
-
-       @Override
-       public long skipArray() throws IOException {
-               return readVarLongCount(in);
-       }
-
-       @Override
-       public long readMapStart() throws IOException {
-               return readVarLongCount(in);
-       }
-
-       @Override
-       public long mapNext() throws IOException {
-               return readVarLongCount(in);
-       }
-
-       @Override
-       public long skipMap() throws IOException {
-               return readVarLongCount(in);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // union
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public int readIndex() throws IOException {
-               return readInt();
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // utils
-       // 
--------------------------------------------------------------------------------------------
-       
-       private void skipBytes(int num) throws IOException {
-               while (num > 0) {
-                       num -= in.skipBytes(num);
-               }
-       }
-       
-       public static long readVarLongCount(DataInput in) throws IOException {
-               long value = in.readUnsignedByte();
-
-               if ((value & 0x80) == 0) {
-                       return value;
-               }
-               else {
-                       long curr;
-                       int shift = 7;
-                       value = value & 0x7f;
-                       while (((curr = in.readUnsignedByte()) & 0x80) != 0){
-                               value |= (curr & 0x7f) << shift;
-                               shift += 7;
-                       }
-                       value |= curr << shift;
-                       return value;
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // serialization
-       // 
--------------------------------------------------------------------------------------------
-       
-       private void readObject(java.io.ObjectInputStream s) throws 
java.io.IOException, ClassNotFoundException {
-               // Read in size, and any hidden stuff
-               s.defaultReadObject();
-               
-               this.stringDecoder = new Utf8();
-               this.in = null;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputEncoder.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputEncoder.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputEncoder.java
deleted file mode 100644
index c41b648..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputEncoder.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.avro.io.Encoder;
-import org.apache.avro.util.Utf8;
-import org.apache.flink.annotation.Internal;
-
-@Internal
-public final class DataOutputEncoder extends Encoder implements 
java.io.Serializable {
-       
-       private static final long serialVersionUID = 1L;
-       
-       private transient DataOutput out;
-       
-       
-       public void setOut(DataOutput out) {
-               this.out = out;
-       }
-
-
-       @Override
-       public void flush() throws IOException {}
-
-       // 
--------------------------------------------------------------------------------------------
-       // primitives
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public void writeNull() {}
-       
-
-       @Override
-       public void writeBoolean(boolean b) throws IOException {
-               out.writeBoolean(b);
-       }
-
-       @Override
-       public void writeInt(int n) throws IOException {
-               out.writeInt(n);
-       }
-
-       @Override
-       public void writeLong(long n) throws IOException {
-               out.writeLong(n);
-       }
-
-       @Override
-       public void writeFloat(float f) throws IOException {
-               out.writeFloat(f);
-       }
-
-       @Override
-       public void writeDouble(double d) throws IOException {
-               out.writeDouble(d);
-       }
-       
-       @Override
-       public void writeEnum(int e) throws IOException {
-               out.writeInt(e);
-       }
-       
-       
-       // 
--------------------------------------------------------------------------------------------
-       // bytes
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public void writeFixed(byte[] bytes, int start, int len) throws 
IOException {
-               out.write(bytes, start, len);
-       }
-       
-       @Override
-       public void writeBytes(byte[] bytes, int start, int len) throws 
IOException {
-               out.writeInt(len);
-               if (len > 0) {
-                       out.write(bytes, start, len);
-               }
-       }
-       
-       @Override
-       public void writeBytes(ByteBuffer bytes) throws IOException {
-               int num = bytes.remaining();
-               out.writeInt(num);
-               
-               if (num > 0) {
-                       writeFixed(bytes);
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // strings
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public void writeString(String str) throws IOException {
-               byte[] bytes = Utf8.getBytesFor(str);
-               writeBytes(bytes, 0, bytes.length);
-       }
-       
-       @Override
-       public void writeString(Utf8 utf8) throws IOException {
-               writeBytes(utf8.getBytes(), 0, utf8.getByteLength());
-               
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // collection types
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public void writeArrayStart() {}
-
-       @Override
-       public void setItemCount(long itemCount) throws IOException {
-               if (itemCount > 0) {
-                       writeVarLongCount(out, itemCount);
-               }
-       }
-
-       @Override
-       public void startItem() {}
-
-       @Override
-       public void writeArrayEnd() throws IOException {
-               // write a single byte 0, shortcut for a var-length long of 0
-               out.write(0);
-       }
-
-       @Override
-       public void writeMapStart() {}
-
-       @Override
-       public void writeMapEnd() throws IOException {
-               // write a single byte 0, shortcut for a var-length long of 0
-               out.write(0);
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // union
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public void writeIndex(int unionIndex) throws IOException {
-               out.writeInt(unionIndex);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // utils
-       // 
--------------------------------------------------------------------------------------------
-               
-       
-       public static final void writeVarLongCount(DataOutput out, long val) 
throws IOException {
-               if (val < 0) {
-                       throw new IOException("Illegal count (must be 
non-negative): " + val);
-               }
-               
-               while ((val & ~0x7FL) != 0) {
-                       out.write(((int) val) | 0x80);
-                       val >>>= 7;
-               }
-               out.write((int) val);
-       }
-       
-       private void readObject(java.io.ObjectInputStream s) throws 
java.io.IOException, ClassNotFoundException {
-               // Read in size, and any hidden stuff
-               s.defaultReadObject();
-
-               this.out = null;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index 6730136..269cf35 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -24,8 +24,6 @@ import com.esotericsoftware.kryo.Serializer;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 
-import org.apache.avro.generic.GenericData;
-
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
@@ -406,7 +404,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
                                }
 
                                // there's actually no way to tell if new Kryo 
serializers are compatible with
-                               // the previous ones they overwrite; we can 
only signal compatibly and hope for the best
+                               // the previous ones they overwrite; we can 
only signal compatibility and hope for the best
                                this.kryoRegistrations = 
reconfiguredRegistrations;
                                return CompatibilityResult.compatible();
                        }
@@ -478,11 +476,8 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
                                                        
registeredTypeWithSerializerEntry.getValue()));
                }
 
-               kryoRegistrations.put(
-                               GenericData.Array.class.getName(),
-                               new KryoRegistration(
-                                               GenericData.Array.class,
-                                               new 
ExecutionConfig.SerializableSerializer<>(new 
Serializers.SpecificInstanceCollectionSerializerForArrayList())));
+               // add Avro support if flink-avro is available; a dummy 
otherwise
+               
Serializers.addAvroGenericDataArrayRegistration(kryoRegistrations);
 
                return kryoRegistrations;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
index 4976d6a..de7b2fc 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
@@ -18,16 +18,6 @@
 
 package org.apache.flink.api.java.typeutils.runtime.kryo;
 
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.esotericsoftware.kryo.serializers.CollectionSerializer;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.specific.SpecificRecordBase;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -35,18 +25,29 @@ import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractionUtils;
+import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.CollectionSerializer;
 
 import java.io.Serializable;
 import java.lang.reflect.Field;
 import java.lang.reflect.GenericArrayType;
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Modifier;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Set;
 
+import static 
org.apache.flink.api.java.typeutils.TypeExtractionUtils.hasSuperclass;
+
 
 /**
  * Class containing utilities for the serializers of the Flink Runtime.
@@ -60,6 +61,14 @@ import java.util.Set;
 @Internal
 public class Serializers {
 
+       private static final String AVRO_SPECIFIC_RECORD_BASE = 
"org.apache.avro.specific.SpecificRecordBase";
+
+       private static final String AVRO_GENERIC_RECORD = 
"org.apache.avro.generic.GenericData$Record";
+
+       private static final String AVRO_KRYO_UTILS = 
"org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils";
+
+       private static final String AVRO_GENERIC_DATA_ARRAY = 
"org.apache.avro.generic.GenericData$Array";
+
        public static void recursivelyRegisterType(TypeInformation<?> typeInfo, 
ExecutionConfig config, Set<Class<?>> alreadySeen) {
                if (typeInfo instanceof GenericTypeInfo) {
                        GenericTypeInfo<?> genericTypeInfo = 
(GenericTypeInfo<?>) typeInfo;
@@ -94,8 +103,11 @@ public class Serializers {
                }
                else {
                        config.registerKryoType(type);
-                       checkAndAddSerializerForTypeAvro(config, type);
-       
+                       // add serializers for Avro type if necessary
+                       if (hasSuperclass(type, AVRO_SPECIFIC_RECORD_BASE) || 
hasSuperclass(type, AVRO_GENERIC_RECORD)) {
+                               addAvroSerializers(config, type);
+                       }
+
                        Field[] fields = type.getDeclaredFields();
                        for (Field field : fields) {
                                if (Modifier.isStatic(field.getModifiers()) || 
Modifier.isTransient(field.getModifiers())) {
@@ -147,20 +159,54 @@ public class Serializers {
                        }
                }
        }
-       
-       // 
------------------------------------------------------------------------
-       
-       private static void checkAndAddSerializerForTypeAvro(ExecutionConfig 
reg, Class<?> type) {
-               if (GenericData.Record.class.isAssignableFrom(type) || 
SpecificRecordBase.class.isAssignableFrom(type)) {
-                       // Avro POJOs contain java.util.List which have 
GenericData.Array as their runtime type
-                       // because Kryo is not able to serialize them properly, 
we use this serializer for them
-                       
reg.registerTypeWithKryoSerializer(GenericData.Array.class, 
SpecificInstanceCollectionSerializerForArrayList.class);
-
-                       // We register this serializer for users who want to 
use untyped Avro records (GenericData.Record).
-                       // Kryo is able to serialize everything in there, 
except for the Schema.
-                       // This serializer is very slow, but using the 
GenericData.Records of Kryo is in general a bad idea.
-                       // we add the serializer as a default serializer 
because Avro is using a private sub-type at runtime.
-                       reg.addDefaultKryoSerializer(Schema.class, 
AvroSchemaSerializer.class);
+
+       /**
+        * Loads the utility class from <code>flink-avro</code> and adds 
Avro-specific serializers.
+        */
+       private static void addAvroSerializers(ExecutionConfig reg, Class<?> 
type) {
+               Class<?> clazz;
+               try {
+                       clazz = Class.forName(AVRO_KRYO_UTILS, false, 
Serializers.class.getClassLoader());
+               }
+               catch (ClassNotFoundException e) {
+                       throw new RuntimeException("Could not load class '" + 
AVRO_KRYO_UTILS + "'. " +
+                               "You may be missing the 'flink-avro' 
dependency.");
+               }
+               try {
+                       clazz.getDeclaredMethod("addAvroSerializers", 
ExecutionConfig.class, Class.class).invoke(null, reg, type);
+               } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException e) {
+                       throw new RuntimeException("Could not access method in 
'flink-avro' dependency.", e);
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       public static void 
addAvroGenericDataArrayRegistration(LinkedHashMap<String, KryoRegistration> 
kryoRegistrations) {
+               try {
+                       Class<?> clazz = Class.forName(AVRO_GENERIC_DATA_ARRAY, 
false, Serializers.class.getClassLoader());
+
+                       kryoRegistrations.put(
+                               AVRO_GENERIC_DATA_ARRAY,
+                               new KryoRegistration(
+                                               clazz,
+                                               new 
ExecutionConfig.SerializableSerializer<>(new 
Serializers.SpecificInstanceCollectionSerializerForArrayList())));
+               }
+               catch (ClassNotFoundException e) {
+                       kryoRegistrations.put(AVRO_GENERIC_DATA_ARRAY,
+                               new 
KryoRegistration(DummyAvroRegisteredClass.class, (Class) 
DummyAvroKryoSerializerClass.class));
+               }
+       }
+
+       public static class DummyAvroRegisteredClass {}
+
+       public static class DummyAvroKryoSerializerClass<T> extends 
Serializer<T> {
+               @Override
+               public void write(Kryo kryo, Output output, Object o) {
+                       throw new UnsupportedOperationException("Could not find 
required Avro dependency.");
+               }
+
+               @Override
+               public T read(Kryo kryo, Input input, Class<T> aClass) {
+                       throw new UnsupportedOperationException("Could not find 
required Avro dependency.");
                }
        }
 
@@ -168,6 +214,9 @@ public class Serializers {
        // Custom Serializers
        // 
--------------------------------------------------------------------------------------------
 
+       /**
+        * Special serializer for Java's {@link ArrayList} used for Avro's 
GenericData.Array.
+        */
        @SuppressWarnings("rawtypes")
        public static class SpecificInstanceCollectionSerializerForArrayList 
extends SpecificInstanceCollectionSerializer<ArrayList> {
                private static final long serialVersionUID = 1L;
@@ -176,19 +225,19 @@ public class Serializers {
                        super(ArrayList.class);
                }
        }
+
        /**
         * Special serializer for Java collections enforcing certain instance 
types.
         * Avro is serializing collections with an "GenericData.Array" type. 
Kryo is not able to handle
         * this type, so we use ArrayLists.
         */
        @SuppressWarnings("rawtypes")
-       public static class SpecificInstanceCollectionSerializer<T extends 
Collection> 
-                       extends CollectionSerializer implements Serializable
-       {
+       public static class SpecificInstanceCollectionSerializer<T extends 
Collection>
+                       extends CollectionSerializer implements Serializable {
                private static final long serialVersionUID = 1L;
-               
+
                private Class<T> type;
-               
+
                public SpecificInstanceCollectionSerializer(Class<T> type) {
                        this.type = type;
                }
@@ -203,27 +252,4 @@ public class Serializers {
                        return kryo.newInstance(this.type);
                }
        }
-
-       /**
-        * Slow serialization approach for Avro schemas.
-        * This is only used with {{@link 
org.apache.avro.generic.GenericData.Record}} types.
-        * Having this serializer, we are able to handle avro Records.
-        */
-       public static class AvroSchemaSerializer extends Serializer<Schema> 
implements Serializable {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public void write(Kryo kryo, Output output, Schema object) {
-                       String schemaAsString = object.toString(false);
-                       output.writeString(schemaAsString);
-               }
-
-               @Override
-               public Schema read(Kryo kryo, Input input, Class<Schema> type) {
-                       String schemaAsString = input.readString();
-                       // the parser seems to be stateful, to we need a new 
one for every type.
-                       Schema.Parser sParser = new Schema.Parser();
-                       return sParser.parse(schemaAsString);
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericArraySerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericArraySerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericArraySerializerTest.java
deleted file mode 100644
index 5b08e52..0000000
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericArraySerializerTest.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-public class AvroGenericArraySerializerTest extends 
AbstractGenericArraySerializerTest {
-       @Override
-       protected <T> TypeSerializer<T> createComponentSerializer(Class<T> 
type) {
-               return new AvroSerializer<T>(type);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java
deleted file mode 100644
index 19fac43..0000000
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-public class AvroGenericTypeComparatorTest extends 
AbstractGenericTypeComparatorTest {
-       @Override
-       protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
-               return new AvroSerializer<T>(type);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java
deleted file mode 100644
index df1ff60..0000000
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-public class AvroGenericTypeSerializerTest extends 
AbstractGenericTypeSerializerTest {
-
-       @Override
-       protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
-               return new AvroSerializer<T>(type);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java
deleted file mode 100644
index 8a89410..0000000
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import static org.junit.Assert.*;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.avro.reflect.Nullable;
-import org.apache.flink.api.common.typeutils.SerializerTestInstance;
-import org.junit.Test;
-
-public class AvroSerializerEmptyArrayTest {
-
-       @Test
-       public void testBookSerialization() {
-               try {
-                       Book b = new Book(123, "This is a test book", 26382648);
-                       AvroSerializer<Book> serializer = new 
AvroSerializer<Book>(Book.class);
-                       SerializerTestInstance<Book> test = new 
SerializerTestInstance<Book>(serializer, Book.class, -1, b);
-                       test.testAll();
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testSerialization() {
-               try {
-                       List<String> titles = new ArrayList<String>();
-
-                       List<Book> books = new ArrayList<Book>();
-                       books.add(new Book(123, "This is a test book", 1));
-                       books.add(new Book(24234234, "This is a test book", 1));
-                       books.add(new Book(1234324, "This is a test book", 3));
-
-                       BookAuthor a = new BookAuthor(1, titles, "Test Author");
-                       a.books = books;
-                       a.bookType = BookAuthor.BookType.journal;
-                       
-                       AvroSerializer<BookAuthor> serializer = new 
AvroSerializer<BookAuthor>(BookAuthor.class);
-                       
-                       SerializerTestInstance<BookAuthor> test = new 
SerializerTestInstance<BookAuthor>(serializer, BookAuthor.class, -1, a);
-                       test.testAll();
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       public static class Book {
-
-               long bookId;
-               @Nullable
-               String title;
-               long authorId;
-
-               public Book() {}
-
-               public Book(long bookId, String title, long authorId) {
-                       this.bookId = bookId;
-                       this.title = title;
-                       this.authorId = authorId;
-               }
-
-               @Override
-               public int hashCode() {
-                       final int prime = 31;
-                       int result = 1;
-                       result = prime * result + (int) (authorId ^ (authorId 
>>> 32));
-                       result = prime * result + (int) (bookId ^ (bookId >>> 
32));
-                       result = prime * result + ((title == null) ? 0 : 
title.hashCode());
-                       return result;
-               }
-
-               @Override
-               public boolean equals(Object obj) {
-                       if (this == obj)
-                               return true;
-                       if (obj == null)
-                               return false;
-                       if (getClass() != obj.getClass())
-                               return false;
-                       Book other = (Book) obj;
-                       if (authorId != other.authorId)
-                               return false;
-                       if (bookId != other.bookId)
-                               return false;
-                       if (title == null) {
-                               if (other.title != null)
-                                       return false;
-                       } else if (!title.equals(other.title))
-                               return false;
-                       return true;
-               }
-       }
-
-       public static class BookAuthor {
-
-               enum BookType {
-                       book,
-                       article,
-                       journal
-               }
-
-               long authorId;
-
-               @Nullable
-               List<String> bookTitles;
-
-               @Nullable
-               List<Book> books;
-
-               String authorName;
-
-               BookType bookType;
-
-               public BookAuthor() {}
-
-               public BookAuthor(long authorId, List<String> bookTitles, 
String authorName) {
-                       this.authorId = authorId;
-                       this.bookTitles = bookTitles;
-                       this.authorName = authorName;
-               }
-
-               @Override
-               public int hashCode() {
-                       final int prime = 31;
-                       int result = 1;
-                       result = prime * result + (int) (authorId ^ (authorId 
>>> 32));
-                       result = prime * result + ((authorName == null) ? 0 : 
authorName.hashCode());
-                       result = prime * result + ((bookTitles == null) ? 0 : 
bookTitles.hashCode());
-                       result = prime * result + ((bookType == null) ? 0 : 
bookType.hashCode());
-                       result = prime * result + ((books == null) ? 0 : 
books.hashCode());
-                       return result;
-               }
-
-               @Override
-               public boolean equals(Object obj) {
-                       if (this == obj)
-                               return true;
-                       if (obj == null)
-                               return false;
-                       if (getClass() != obj.getClass())
-                               return false;
-                       BookAuthor other = (BookAuthor) obj;
-                       if (authorId != other.authorId)
-                               return false;
-                       if (authorName == null) {
-                               if (other.authorName != null)
-                                       return false;
-                       } else if (!authorName.equals(other.authorName))
-                               return false;
-                       if (bookTitles == null) {
-                               if (other.bookTitles != null)
-                                       return false;
-                       } else if (!bookTitles.equals(other.bookTitles))
-                               return false;
-                       if (bookType != other.bookType)
-                               return false;
-                       if (books == null) {
-                               if (other.books != null)
-                                       return false;
-                       } else if (!books.equals(other.books))
-                               return false;
-                       return true;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
index 5a404bd..1cacc9e 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
@@ -18,20 +18,22 @@
 
 package org.apache.flink.api.java.typeutils.runtime.kryo;
 
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -42,6 +44,20 @@ import static org.junit.Assert.assertTrue;
  */
 public class KryoSerializerCompatibilityTest {
 
+       @Test
+       public void testMigrationStrategyForRemovedAvroDependency() throws 
Exception {
+               KryoSerializer<TestClass> kryoSerializerForA = new 
KryoSerializer<>(TestClass.class, new ExecutionConfig());
+
+               // read configuration again from bytes
+               TypeSerializerConfigSnapshot kryoSerializerConfigSnapshot;
+               try (InputStream in = 
getClass().getResourceAsStream("/kryo-serializer-flink1.3-snapshot")) {
+                       kryoSerializerConfigSnapshot = 
TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
+                               new DataInputViewStreamWrapper(in), 
Thread.currentThread().getContextClassLoader());
+               }
+               CompatibilityResult<TestClass> compatResult = 
kryoSerializerForA.ensureCompatibility(kryoSerializerConfigSnapshot);
+               assertFalse(compatResult.isRequiresMigration());
+       }
+
        /**
         * Verifies that reconfiguration result is INCOMPATIBLE if data type 
has changed.
         */
@@ -60,7 +76,7 @@ public class KryoSerializerCompatibilityTest {
                KryoSerializer<TestClassB> kryoSerializerForB = new 
KryoSerializer<>(TestClassB.class, new ExecutionConfig());
 
                // read configuration again from bytes
-               try(ByteArrayInputStream in = new 
ByteArrayInputStream(serializedConfig)) {
+               try (ByteArrayInputStream in = new 
ByteArrayInputStream(serializedConfig)) {
                        kryoSerializerConfigSnapshot = 
TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
                                new DataInputViewStreamWrapper(in), 
Thread.currentThread().getContextClassLoader());
                }
@@ -103,7 +119,7 @@ public class KryoSerializerCompatibilityTest {
                kryoSerializer = new KryoSerializer<>(TestClass.class, 
executionConfig);
 
                // read configuration from bytes
-               try(ByteArrayInputStream in = new 
ByteArrayInputStream(serializedConfig)) {
+               try (ByteArrayInputStream in = new 
ByteArrayInputStream(serializedConfig)) {
                        kryoSerializerConfigSnapshot = 
TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
                                new DataInputViewStreamWrapper(in), 
Thread.currentThread().getContextClassLoader());
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/test/resources/kryo-serializer-flink1.3-snapshot
----------------------------------------------------------------------
diff --git a/flink-core/src/test/resources/kryo-serializer-flink1.3-snapshot 
b/flink-core/src/test/resources/kryo-serializer-flink1.3-snapshot
new file mode 100644
index 0000000..0123a9c
Binary files /dev/null and 
b/flink-core/src/test/resources/kryo-serializer-flink1.3-snapshot differ

Reply via email to