http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
 
b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
deleted file mode 100644
index c39db15..0000000
--- 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
+++ /dev/null
@@ -1,528 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.flink.api.io.avro.generated.Address;
-import org.apache.flink.api.io.avro.generated.Colors;
-import org.apache.flink.api.io.avro.generated.Fixed16;
-import org.apache.flink.api.io.avro.generated.User;
-import org.apache.flink.util.StringUtils;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-/**
- * Tests the {@link DataOutputEncoder} and {@link DataInputDecoder} classes 
for Avro serialization.
- */
-public class EncoderDecoderTest {
-       @Test
-       public void testComplexStringsDirecty() {
-               try {
-                       Random rnd = new Random(349712539451944123L);
-                       
-                       for (int i = 0; i < 10; i++) {
-                               String testString = 
StringUtils.getRandomString(rnd, 10, 100);
-                               
-                               ByteArrayOutputStream baos = new 
ByteArrayOutputStream(512);
-                               {
-                                       DataOutputStream dataOut = new 
DataOutputStream(baos);
-                                       DataOutputEncoder encoder = new 
DataOutputEncoder();
-                                       encoder.setOut(dataOut);
-                                       
-                                       encoder.writeString(testString);
-                                       dataOut.flush();
-                                       dataOut.close();
-                               }
-                               
-                               byte[] data = baos.toByteArray();
-                               
-                               // deserialize
-                               {
-                                       ByteArrayInputStream bais = new 
ByteArrayInputStream(data);
-                                       DataInputStream dataIn = new 
DataInputStream(bais);
-                                       DataInputDecoder decoder = new 
DataInputDecoder();
-                                       decoder.setIn(dataIn);
-       
-                                       String deserialized = 
decoder.readString();
-                                       
-                                       assertEquals(testString, deserialized);
-                               }
-                       }
-               }
-               catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       fail("Test failed due to an exception: " + 
e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testPrimitiveTypes() {
-               
-               testObjectSerialization(new Boolean(true));
-               testObjectSerialization(new Boolean(false));
-               
-               testObjectSerialization(Byte.valueOf((byte) 0));
-               testObjectSerialization(Byte.valueOf((byte) 1));
-               testObjectSerialization(Byte.valueOf((byte) -1));
-               testObjectSerialization(Byte.valueOf(Byte.MIN_VALUE));
-               testObjectSerialization(Byte.valueOf(Byte.MAX_VALUE));
-               
-               testObjectSerialization(Short.valueOf((short) 0));
-               testObjectSerialization(Short.valueOf((short) 1));
-               testObjectSerialization(Short.valueOf((short) -1));
-               testObjectSerialization(Short.valueOf(Short.MIN_VALUE));
-               testObjectSerialization(Short.valueOf(Short.MAX_VALUE));
-               
-               testObjectSerialization(Integer.valueOf(0));
-               testObjectSerialization(Integer.valueOf(1));
-               testObjectSerialization(Integer.valueOf(-1));
-               testObjectSerialization(Integer.valueOf(Integer.MIN_VALUE));
-               testObjectSerialization(Integer.valueOf(Integer.MAX_VALUE));
-               
-               testObjectSerialization(Long.valueOf(0));
-               testObjectSerialization(Long.valueOf(1));
-               testObjectSerialization(Long.valueOf(-1));
-               testObjectSerialization(Long.valueOf(Long.MIN_VALUE));
-               testObjectSerialization(Long.valueOf(Long.MAX_VALUE));
-               
-               testObjectSerialization(Float.valueOf(0));
-               testObjectSerialization(Float.valueOf(1));
-               testObjectSerialization(Float.valueOf(-1));
-               testObjectSerialization(Float.valueOf((float)Math.E));
-               testObjectSerialization(Float.valueOf((float)Math.PI));
-               testObjectSerialization(Float.valueOf(Float.MIN_VALUE));
-               testObjectSerialization(Float.valueOf(Float.MAX_VALUE));
-               testObjectSerialization(Float.valueOf(Float.MIN_NORMAL));
-               testObjectSerialization(Float.valueOf(Float.NaN));
-               testObjectSerialization(Float.valueOf(Float.NEGATIVE_INFINITY));
-               testObjectSerialization(Float.valueOf(Float.POSITIVE_INFINITY));
-               
-               testObjectSerialization(Double.valueOf(0));
-               testObjectSerialization(Double.valueOf(1));
-               testObjectSerialization(Double.valueOf(-1));
-               testObjectSerialization(Double.valueOf(Math.E));
-               testObjectSerialization(Double.valueOf(Math.PI));
-               testObjectSerialization(Double.valueOf(Double.MIN_VALUE));
-               testObjectSerialization(Double.valueOf(Double.MAX_VALUE));
-               testObjectSerialization(Double.valueOf(Double.MIN_NORMAL));
-               testObjectSerialization(Double.valueOf(Double.NaN));
-               
testObjectSerialization(Double.valueOf(Double.NEGATIVE_INFINITY));
-               
testObjectSerialization(Double.valueOf(Double.POSITIVE_INFINITY));
-               
-               testObjectSerialization("");
-               testObjectSerialization("abcdefg");
-               testObjectSerialization("ab\u1535\u0155xyz\u706F");
-               
-               testObjectSerialization(new SimpleTypes(3637, 54876486548L, 
(byte) 65, "We're out looking for astronauts", (short) 0x2387, 2.65767523));
-               testObjectSerialization(new SimpleTypes(705608724, -1L, (byte) 
-65, "Serve me the sky with a big slice of lemon", (short) Byte.MIN_VALUE, 
0.0000001));
-       }
-       
-       @Test
-       public void testArrayTypes() {
-               {
-                       int[] array = new int[] {1, 2, 3, 4, 5};
-                       testObjectSerialization(array);
-               }
-               {
-                       long[] array = new long[] {1, 2, 3, 4, 5};
-                       testObjectSerialization(array);
-               }
-               {
-                       float[] array = new float[] {1, 2, 3, 4, 5};
-                       testObjectSerialization(array);
-               }
-               {
-                       double[] array = new double[] {1, 2, 3, 4, 5};
-                       testObjectSerialization(array);
-               }
-               {
-                       String[] array = new String[] {"Oh", "my", "what", 
"do", "we", "have", "here", "?"};
-                       testObjectSerialization(array);
-               }
-       }
-       
-       @Test
-       public void testEmptyArray() {
-               {
-                       int[] array = new int[0];
-                       testObjectSerialization(array);
-               }
-               {
-                       long[] array = new long[0];
-                       testObjectSerialization(array);
-               }
-               {
-                       float[] array = new float[0];
-                       testObjectSerialization(array);
-               }
-               {
-                       double[] array = new double[0];
-                       testObjectSerialization(array);
-               }
-               {
-                       String[] array = new String[0];
-                       testObjectSerialization(array);
-               }
-       }
-       
-       @Test
-       public void testObjects() {
-               // simple object containing only primitives
-               {
-                       testObjectSerialization(new Book(976243875L, "The 
Serialization Odysse", 42));
-               }
-               
-               // object with collection
-               {
-                       ArrayList<String> list = new ArrayList<String>();
-                       list.add("A");
-                       list.add("B");
-                       list.add("C");
-                       list.add("D");
-                       list.add("E");
-                       
-                       testObjectSerialization(new BookAuthor(976243875L, 
list, "Arno Nym"));
-               }
-               
-               // object with empty collection
-               {
-                       ArrayList<String> list = new ArrayList<String>();
-                       testObjectSerialization(new BookAuthor(987654321L, 
list, "The Saurus"));
-               }
-       }
-       
-       @Test
-       public void testNestedObjectsWithCollections() {
-               testObjectSerialization(new ComplexNestedObject2(true));
-       }
-       
-       @Test
-       public void testGeneratedObjectWithNullableFields() {
-               List<CharSequence> strings = Arrays.asList(new CharSequence[] { 
"These", "strings", "should", "be", "recognizable", "as", "a", "meaningful", 
"sequence" });
-               List<Boolean> bools = Arrays.asList(true, true, false, false, 
true, false, true, true);
-               Map<CharSequence, Long> map = new HashMap<CharSequence, Long>();
-               map.put("1", 1L);
-               map.put("2", 2L);
-               map.put("3", 3L);
-
-               byte[] b = new byte[16];
-               new Random().nextBytes(b);
-               Fixed16 f = new Fixed16(b);
-               Address addr = new Address(new Integer(239), "6th Main", 
"Bangalore",
-                               "Karnataka", "560075");
-               User user = new User("Freudenreich", 1337, "macintosh gray",
-                               1234567890L, 3.1415926, null, true, strings, 
bools, null,
-                               Colors.GREEN, map, f, new Boolean(true), addr);
-               
-               testObjectSerialization(user);
-       }
-       
-       @Test
-       public void testVarLenCountEncoding() {
-               try {
-                       long[] values = new long[] { 0, 1, 2, 3, 4, 0, 574, 
45236, 0, 234623462, 23462462346L, 0, 9734028767869761L, 0x7fffffffffffffffL};
-                       
-                       // write
-                       ByteArrayOutputStream baos = new 
ByteArrayOutputStream(512);
-                       {
-                               DataOutputStream dataOut = new 
DataOutputStream(baos);
-                               
-                               for (long val : values) {
-                                       
DataOutputEncoder.writeVarLongCount(dataOut, val);
-                               }
-                               
-                               dataOut.flush();
-                               dataOut.close();
-                       }
-                       
-                       // read
-                       {
-                               ByteArrayInputStream bais = new 
ByteArrayInputStream(baos.toByteArray());
-                               DataInputStream dataIn = new 
DataInputStream(bais);
-                               
-                               for (long val : values) {
-                                       long read = 
DataInputDecoder.readVarLongCount(dataIn);
-                                       assertEquals("Wrong var-len encoded 
value read.", val, read);
-                               }
-                       }
-               }
-               catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       fail("Test failed due to an exception: " + 
e.getMessage());
-               }
-       }
-       
-       private static <X> void testObjectSerialization(X obj) {
-               
-               try {
-                       
-                       // serialize
-                       ByteArrayOutputStream baos = new 
ByteArrayOutputStream(512);
-                       {
-                               DataOutputStream dataOut = new 
DataOutputStream(baos);
-                               DataOutputEncoder encoder = new 
DataOutputEncoder();
-                               encoder.setOut(dataOut);
-                               
-                               @SuppressWarnings("unchecked")
-                               Class<X> clazz = (Class<X>) obj.getClass();
-                               ReflectDatumWriter<X> writer = new 
ReflectDatumWriter<X>(clazz);
-                               
-                               writer.write(obj, encoder);
-                               dataOut.flush();
-                               dataOut.close();
-                       }
-                       
-                       byte[] data = baos.toByteArray();
-                       X result = null;
-                       
-                       // deserialize
-                       {
-                               ByteArrayInputStream bais = new 
ByteArrayInputStream(data);
-                               DataInputStream dataIn = new 
DataInputStream(bais);
-                               DataInputDecoder decoder = new 
DataInputDecoder();
-                               decoder.setIn(dataIn);
-
-                               @SuppressWarnings("unchecked")
-                               Class<X> clazz = (Class<X>) obj.getClass();
-                               ReflectDatumReader<X> reader = new 
ReflectDatumReader<X>(clazz);
-                               
-                               // create a reuse object if possible, otherwise 
we have no reuse object 
-                               X reuse = null;
-                               try {
-                                       @SuppressWarnings("unchecked")
-                                       X test = (X) 
obj.getClass().newInstance();
-                                       reuse = test;
-                               } catch (Throwable t) {}
-                               
-                               result = reader.read(reuse, decoder);
-                       }
-                       
-                       // check
-                       final String message = "Deserialized object is not the 
same as the original";
-                       
-                       if (obj.getClass().isArray()) {
-                               Class<?> clazz = obj.getClass();
-                               if (clazz == byte[].class) {
-                                       assertArrayEquals(message, (byte[]) 
obj, (byte[]) result);
-                               }
-                               else if (clazz == short[].class) {
-                                       assertArrayEquals(message, (short[]) 
obj, (short[]) result);
-                               }
-                               else if (clazz == int[].class) {
-                                       assertArrayEquals(message, (int[]) obj, 
(int[]) result);
-                               }
-                               else if (clazz == long[].class) {
-                                       assertArrayEquals(message, (long[]) 
obj, (long[]) result);
-                               }
-                               else if (clazz == char[].class) {
-                                       assertArrayEquals(message, (char[]) 
obj, (char[]) result);
-                               }
-                               else if (clazz == float[].class) {
-                                       assertArrayEquals(message, (float[]) 
obj, (float[]) result, 0.0f);
-                               }
-                               else if (clazz == double[].class) {
-                                       assertArrayEquals(message, (double[]) 
obj, (double[]) result, 0.0);
-                               } else {
-                                       assertArrayEquals(message, (Object[]) 
obj, (Object[]) result);
-                               }
-                       } else {
-                               assertEquals(message, obj, result);
-                       }
-               }
-               catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       fail("Test failed due to an exception: " + 
e.getMessage());
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       //  Test Objects
-       // 
--------------------------------------------------------------------------------------------
-
-
-       public static final class SimpleTypes {
-               
-               private final int iVal;
-               private final long lVal;
-               private final byte bVal;
-               private final String sVal;
-               private final short rVal;
-               private final double dVal;
-               
-               
-               public SimpleTypes() {
-                       this(0, 0, (byte) 0, "", (short) 0, 0);
-               }
-               
-               public SimpleTypes(int iVal, long lVal, byte bVal, String sVal, 
short rVal, double dVal) {
-                       this.iVal = iVal;
-                       this.lVal = lVal;
-                       this.bVal = bVal;
-                       this.sVal = sVal;
-                       this.rVal = rVal;
-                       this.dVal = dVal;
-               }
-               
-               @Override
-               public boolean equals(Object obj) {
-                       if (obj.getClass() == SimpleTypes.class) {
-                               SimpleTypes other = (SimpleTypes) obj;
-                               
-                               return other.iVal == this.iVal &&
-                                               other.lVal == this.lVal &&
-                                               other.bVal == this.bVal &&
-                                               other.sVal.equals(this.sVal) &&
-                                               other.rVal == this.rVal &&
-                                               other.dVal == this.dVal;
-                               
-                       } else {
-                               return false;
-                       }
-               }
-       }
-       
-       public static class ComplexNestedObject1 {
-               
-               private double doubleValue;
-               
-               private List<String> stringList;
-               
-               public ComplexNestedObject1() {}
-               
-               public ComplexNestedObject1(int offInit) {
-                       this.doubleValue = 6293485.6723 + offInit;
-                               
-                       this.stringList = new ArrayList<String>();
-                       this.stringList.add("A" + offInit);
-                       this.stringList.add("somewhat" + offInit);
-                       this.stringList.add("random" + offInit);
-                       this.stringList.add("collection" + offInit);
-                       this.stringList.add("of" + offInit);
-                       this.stringList.add("strings" + offInit);
-               }
-               
-               @Override
-               public boolean equals(Object obj) {
-                       if (obj.getClass() == ComplexNestedObject1.class) {
-                               ComplexNestedObject1 other = 
(ComplexNestedObject1) obj;
-                               return other.doubleValue == this.doubleValue && 
this.stringList.equals(other.stringList);
-                       } else {
-                               return false;
-                       }
-               }
-       }
-       
-       public static class ComplexNestedObject2 {
-               
-               private long longValue;
-               
-               private Map<String, ComplexNestedObject1> theMap;
-               
-               public ComplexNestedObject2() {}
-               
-               public ComplexNestedObject2(boolean init) {
-                       this.longValue = 46547;
-                               
-                       this.theMap = new HashMap<String, 
ComplexNestedObject1>();
-                       this.theMap.put("36354L", new 
ComplexNestedObject1(43546543));
-                       this.theMap.put("785611L", new 
ComplexNestedObject1(45784568));
-                       this.theMap.put("43L", new 
ComplexNestedObject1(9876543));
-                       this.theMap.put("-45687L", new 
ComplexNestedObject1(7897615));
-                       this.theMap.put("1919876876896L", new 
ComplexNestedObject1(27154));
-                       this.theMap.put("-868468468L", new 
ComplexNestedObject1(546435));
-               }
-               
-               @Override
-               public boolean equals(Object obj) {
-                       if (obj.getClass() == ComplexNestedObject2.class) {
-                               ComplexNestedObject2 other = 
(ComplexNestedObject2) obj;
-                               return other.longValue == this.longValue && 
this.theMap.equals(other.theMap);
-                       } else {
-                               return false;
-                       }
-               }
-       }
-       
-       public static class Book {
-
-               private long bookId;
-               private String title;
-               private long authorId;
-
-               public Book() {}
-
-               public Book(long bookId, String title, long authorId) {
-                       this.bookId = bookId;
-                       this.title = title;
-                       this.authorId = authorId;
-               }
-               
-               @Override
-               public boolean equals(Object obj) {
-                       if (obj.getClass() == Book.class) {
-                               Book other = (Book) obj;
-                               return other.bookId == this.bookId && 
other.authorId == this.authorId && this.title.equals(other.title);
-                       } else {
-                               return false;
-                       }
-               }
-       }
-
-       public static class BookAuthor {
-
-               private long authorId;
-               private List<String> bookTitles;
-               private String authorName;
-
-               public BookAuthor() {}
-
-               public BookAuthor(long authorId, List<String> bookTitles, 
String authorName) {
-                       this.authorId = authorId;
-                       this.bookTitles = bookTitles;
-                       this.authorName = authorName;
-               }
-               
-               @Override
-               public boolean equals(Object obj) {
-                       if (obj.getClass() == BookAuthor.class) {
-                               BookAuthor other = (BookAuthor) obj;
-                               return other.authorName.equals(this.authorName) 
&& other.authorId == this.authorId &&
-                                               
other.bookTitles.equals(this.bookTitles);
-                       } else {
-                               return false;
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
 
b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
deleted file mode 100644
index 1174786..0000000
--- 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.avro.testjar;
-
-// 
================================================================================================
-//  This file defines the classes for the AvroExternalJarProgramITCase.
-//  The program is exported into src/test/resources/AvroTestProgram.jar.
-//
-//  THIS FILE MUST STAY FULLY COMMENTED SUCH THAT THE HERE DEFINED CLASSES ARE 
NOT COMPILED
-//  AND ADDED TO THE test-classes DIRECTORY. OTHERWISE, THE EXTERNAL CLASS 
LOADING WILL
-//  NOT BE COVERED BY THIS TEST.
-// 
================================================================================================
-
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.AvroInputFormat;
-import org.apache.flink.core.fs.Path;
-
-public class AvroExternalJarProgram  {
-
-       public static final class Color {
-               
-               private String name;
-               private double saturation;
-               
-               public Color() {
-                       name = "";
-                       saturation = 1.0;
-               }
-               
-               public Color(String name, double saturation) {
-                       this.name = name;
-                       this.saturation = saturation;
-               }
-               
-               public String getName() {
-                       return name;
-               }
-               
-               public void setName(String name) {
-                       this.name = name;
-               }
-               
-               public double getSaturation() {
-                       return saturation;
-               }
-               
-               public void setSaturation(double saturation) {
-                       this.saturation = saturation;
-               }
-               
-               @Override
-               public String toString() {
-                       return name + '(' + saturation + ')';
-               }
-       }
-       
-       public static final class MyUser {
-               
-               private String name;
-               private List<Color> colors;
-               
-               public MyUser() {
-                       name = "unknown";
-                       colors = new ArrayList<Color>();
-               }
-               
-               public MyUser(String name, List<Color> colors) {
-                       this.name = name;
-                       this.colors = colors;
-               }
-               
-               public String getName() {
-                       return name;
-               }
-               
-               public List<Color> getColors() {
-                       return colors;
-               }
-               
-               public void setName(String name) {
-                       this.name = name;
-               }
-               
-               public void setColors(List<Color> colors) {
-                       this.colors = colors;
-               }
-               
-               @Override
-               public String toString() {
-                       return name + " : " + colors;
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       public static final class NameExtractor extends RichMapFunction<MyUser, 
Tuple2<String, MyUser>> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public Tuple2<String, MyUser> map(MyUser u) {
-                       String namePrefix = u.getName().substring(0, 1);
-                       return new Tuple2<String, MyUser>(namePrefix, u);
-               }
-       }
-       
-       public static final class NameGrouper extends 
RichReduceFunction<Tuple2<String, MyUser>> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public Tuple2<String, MyUser> reduce(Tuple2<String, MyUser> 
val1, Tuple2<String, MyUser> val2) {
-                       return val1;
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Test Data
-       // 
--------------------------------------------------------------------------------------------
-       
-       public static final class Generator {
-               
-               private final Random rnd = new Random(2389756789345689276L);
-               
-               public MyUser nextUser() {
-                       return randomUser();
-               }
-               
-               private MyUser randomUser() {
-                       
-                       int numColors = rnd.nextInt(5);
-                       ArrayList<Color> colors = new 
ArrayList<Color>(numColors);
-                       for (int i = 0; i < numColors; i++) {
-                               colors.add(new Color(randomString(), 
rnd.nextDouble()));
-                       }
-                       
-                       return new MyUser(randomString(), colors);
-               }
-               
-               private String randomString() {
-                       char[] c = new char[this.rnd.nextInt(20) + 5];
-                       
-                       for (int i = 0; i < c.length; i++) {
-                               c[i] = (char) (this.rnd.nextInt(150) + 40);
-                       }
-                       
-                       return new String(c);
-               }
-       }
-       
-       public static void writeTestData(File testFile, int numRecords) throws 
IOException {
-               
-               DatumWriter<MyUser> userDatumWriter = new 
ReflectDatumWriter<MyUser>(MyUser.class);
-               DataFileWriter<MyUser> dataFileWriter = new 
DataFileWriter<MyUser>(userDatumWriter);
-               
-               
dataFileWriter.create(ReflectData.get().getSchema(MyUser.class), testFile);
-               
-               
-               Generator generator = new Generator();
-               
-               for (int i = 0; i < numRecords; i++) {
-                       MyUser user = generator.nextUser();
-                       dataFileWriter.append(user);
-               }
-               
-               dataFileWriter.close();
-       }
-
-//     public static void main(String[] args) throws Exception {
-//             String testDataFile = new 
File("src/test/resources/testdata.avro").getAbsolutePath();
-//             writeTestData(new File(testDataFile), 50);
-//     }
-       
-       public static void main(String[] args) throws Exception {
-               String inputPath = args[0];
-               
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
-               DataSet<MyUser> input = env.createInput(new 
AvroInputFormat<MyUser>(new Path(inputPath), MyUser.class));
-       
-               DataSet<Tuple2<String, MyUser>> result = input.map(new 
NameExtractor()).groupBy(0).reduce(new NameGrouper());
-               
-               result.output(new 
DiscardingOutputFormat<Tuple2<String,MyUser>>());
-               env.execute();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
 
b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
deleted file mode 100644
index f33f433..0000000
--- 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.io.avro;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.io.avro.generated.User;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.io.AvroInputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-@RunWith(Parameterized.class)
-public class AvroPojoTest extends MultipleProgramsTestBase {
-       public AvroPojoTest(TestExecutionMode mode) {
-               super(mode);
-       }
-
-       private File inFile;
-       private String resultPath;
-       private String expected;
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception{
-               resultPath = tempFolder.newFile().toURI().toString();
-               inFile = tempFolder.newFile();
-               AvroRecordInputFormatTest.writeTestFile(inFile);
-       }
-
-       @After
-       public void after() throws Exception{
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-
-       @Test
-       public void testSimpleAvroRead() throws Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               Path in = new Path(inFile.getAbsoluteFile().toURI());
-
-               AvroInputFormat<User> users = new AvroInputFormat<User>(in, 
User.class);
-               DataSet<User> usersDS = env.createInput(users)
-                               // null map type because the order changes in 
different JVMs (hard to test)
-               .map(new MapFunction<User, User>() {
-                       @Override
-                       public User map(User value) throws Exception {
-                               value.setTypeMap(null);
-                               return value;
-                       }
-               });
-
-               usersDS.writeAsText(resultPath);
-
-               env.execute("Simple Avro read job");
-
-
-               expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, 
\"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 
123.45, \"type_null_test\": null, \"type_bool_test\": true, 
\"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": 
[true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", 
\"type_map\": null, \"type_fixed\": null, \"type_union\": null, 
\"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": 
\"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" +
-                                       "{\"name\": \"Charlie\", 
\"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 
1337, \"type_double_test\": 1.337, \"type_null_test\": null, 
\"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": 
[], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": null, 
\"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, 
\"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", 
\"zip\": \"NW1 6XE\"}}\n";
-       }
-
-       @Test
-       public void testSerializeWithAvro() throws Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.getConfig().enableForceAvro();
-               Path in = new Path(inFile.getAbsoluteFile().toURI());
-
-               AvroInputFormat<User> users = new AvroInputFormat<User>(in, 
User.class);
-               DataSet<User> usersDS = env.createInput(users)
-                               // null map type because the order changes in 
different JVMs (hard to test)
-                               .map(new MapFunction<User, User>() {
-                                       @Override
-                                       public User map(User value) throws 
Exception {
-                                               Map<CharSequence, Long> ab = 
new HashMap<CharSequence, Long>(1);
-                                               ab.put("hehe", 12L);
-                                               value.setTypeMap(ab);
-                                               return value;
-                                       }
-                               });
-
-               usersDS.writeAsText(resultPath);
-
-               env.execute("Simple Avro read job");
-
-
-               expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, 
\"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 
123.45, \"type_null_test\": null, \"type_bool_test\": true, 
\"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": 
[true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", 
\"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, 
\"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": 
\"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" +
-                                       "{\"name\": \"Charlie\", 
\"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 
1337, \"type_double_test\": 1.337, \"type_null_test\": null, 
\"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": 
[], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": 
{\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": 
{\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": 
\"London\", \"zip\": \"NW1 6XE\"}}\n";
-
-       }
-
-       @Test
-       public void testKeySelection() throws Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.getConfig().enableObjectReuse();
-               Path in = new Path(inFile.getAbsoluteFile().toURI());
-
-               AvroInputFormat<User> users = new AvroInputFormat<User>(in, 
User.class);
-               DataSet<User> usersDS = env.createInput(users);
-
-               DataSet<Tuple2<String, Integer>> res = 
usersDS.groupBy("name").reduceGroup(new GroupReduceFunction<User, 
Tuple2<String, Integer>>() {
-                       @Override
-                       public void reduce(Iterable<User> values, 
Collector<Tuple2<String, Integer>> out) throws Exception {
-                               for (User u : values) {
-                                       out.collect(new Tuple2<String, 
Integer>(u.getName().toString(), 1));
-                               }
-                       }
-               });
-               res.writeAsText(resultPath);
-               env.execute("Avro Key selection");
-
-
-               expected = "(Alyssa,1)\n(Charlie,1)\n";
-       }
-
-       @Test
-       public void testWithAvroGenericSer() throws Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.getConfig().enableForceAvro();
-               Path in = new Path(inFile.getAbsoluteFile().toURI());
-
-               AvroInputFormat<User> users = new AvroInputFormat<User>(in, 
User.class);
-               DataSet<User> usersDS = env.createInput(users);
-
-               DataSet<Tuple2<String, Integer>> res = usersDS.groupBy(new 
KeySelector<User, String>() {
-                       @Override
-                       public String getKey(User value) throws Exception {
-                               return String.valueOf(value.getName());
-                       }
-               }).reduceGroup(new GroupReduceFunction<User, Tuple2<String, 
Integer>>() {
-                       @Override
-                       public void reduce(Iterable<User> values, 
Collector<Tuple2<String, Integer>> out) throws Exception {
-                               for(User u : values) {
-                                       out.collect(new Tuple2<String, 
Integer>(u.getName().toString(), 1));
-                               }
-                       }
-               });
-
-               res.writeAsText(resultPath);
-               env.execute("Avro Key selection");
-
-
-               expected = "(Charlie,1)\n(Alyssa,1)\n";
-       }
-
-       @Test
-       public void testWithKryoGenericSer() throws Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.getConfig().enableForceKryo();
-               Path in = new Path(inFile.getAbsoluteFile().toURI());
-
-               AvroInputFormat<User> users = new AvroInputFormat<User>(in, 
User.class);
-               DataSet<User> usersDS = env.createInput(users);
-
-               DataSet<Tuple2<String, Integer>> res = usersDS.groupBy(new 
KeySelector<User, String>() {
-                       @Override
-                       public String getKey(User value) throws Exception {
-                               return String.valueOf(value.getName());
-                       }
-               }).reduceGroup(new GroupReduceFunction<User, Tuple2<String, 
Integer>>() {
-                       @Override
-                       public void reduce(Iterable<User> values, 
Collector<Tuple2<String, Integer>> out) throws Exception {
-                               for (User u : values) {
-                                       out.collect(new Tuple2<String, 
Integer>(u.getName().toString(), 1));
-                               }
-                       }
-               });
-
-               res.writeAsText(resultPath);
-               env.execute("Avro Key selection");
-
-
-               expected = "(Charlie,1)\n(Alyssa,1)\n";
-       }
-
-       /**
-        * Test some know fields for grouping on
-        */
-       @Test
-       public void testAllFields() throws Exception {
-               for(String fieldName : Arrays.asList("name", "type_enum", 
"type_double_test")) {
-                       testField(fieldName);
-               }
-       }
-
-       private void testField(final String fieldName) throws Exception {
-               before();
-
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               Path in = new Path(inFile.getAbsoluteFile().toURI());
-
-               AvroInputFormat<User> users = new AvroInputFormat<User>(in, 
User.class);
-               DataSet<User> usersDS = env.createInput(users);
-
-               DataSet<Object> res = 
usersDS.groupBy(fieldName).reduceGroup(new GroupReduceFunction<User, Object>() {
-                       @Override
-                       public void reduce(Iterable<User> values, 
Collector<Object> out) throws Exception {
-                               for(User u : values) {
-                                       out.collect(u.get(fieldName));
-                               }
-                       }
-               });
-               res.writeAsText(resultPath);
-               env.execute("Simple Avro read job");
-
-               // test if automatic registration of the Types worked
-               ExecutionConfig ec = env.getConfig();
-               
Assert.assertTrue(ec.getRegisteredKryoTypes().contains(org.apache.flink.api.io.avro.generated.Fixed16.class));
-
-               if(fieldName.equals("name")) {
-                       expected = "Alyssa\nCharlie";
-               } else if(fieldName.equals("type_enum")) {
-                       expected = "GREEN\nRED\n";
-               } else if(fieldName.equals("type_double_test")) {
-                       expected = "123.45\n1.337\n";
-               } else {
-                       Assert.fail("Unknown field");
-               }
-
-               after();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
 
b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
deleted file mode 100644
index 91a9612..0000000
--- 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
+++ /dev/null
@@ -1,458 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.io.avro;
-
-import static org.junit.Assert.*;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.util.*;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.file.FileReader;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.avro.util.Utf8;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.io.avro.generated.Address;
-import org.apache.flink.api.io.avro.generated.Colors;
-import org.apache.flink.api.io.avro.generated.User;
-import org.apache.flink.api.java.io.AvroInputFormat;
-import org.apache.flink.api.java.typeutils.AvroTypeInfo;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Test the avro input format.
- * (The testcase is mostly the getting started tutorial of avro)
- * http://avro.apache.org/docs/current/gettingstartedjava.html
- */
-public class AvroRecordInputFormatTest {
-       
-       public File testFile;
-       
-       final static String TEST_NAME = "Alyssa";
-       
-       final static String TEST_ARRAY_STRING_1 = "ELEMENT 1";
-       final static String TEST_ARRAY_STRING_2 = "ELEMENT 2";
-       
-       final static boolean TEST_ARRAY_BOOLEAN_1 = true;
-       final static boolean TEST_ARRAY_BOOLEAN_2 = false;
-       
-       final static Colors TEST_ENUM_COLOR = Colors.GREEN;
-       
-       final static String TEST_MAP_KEY1 = "KEY 1";
-       final static long TEST_MAP_VALUE1 = 8546456L;
-       final static String TEST_MAP_KEY2 = "KEY 2";
-       final static long TEST_MAP_VALUE2 = 17554L;
-       
-       final static int TEST_NUM = 239;
-       final static String TEST_STREET = "Baker Street";
-       final static String TEST_CITY = "London";
-       final static String TEST_STATE = "London";
-       final static String TEST_ZIP = "NW1 6XE";
-       
-
-       private Schema userSchema = new User().getSchema();
-
-
-       public static void writeTestFile(File testFile) throws IOException {
-               ArrayList<CharSequence> stringArray = new 
ArrayList<CharSequence>();
-               stringArray.add(TEST_ARRAY_STRING_1);
-               stringArray.add(TEST_ARRAY_STRING_2);
-
-               ArrayList<Boolean> booleanArray = new ArrayList<Boolean>();
-               booleanArray.add(TEST_ARRAY_BOOLEAN_1);
-               booleanArray.add(TEST_ARRAY_BOOLEAN_2);
-
-               HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, 
Long>();
-               longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1);
-               longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2);
-               
-               Address addr = new Address();
-               addr.setNum(TEST_NUM);
-               addr.setStreet(TEST_STREET);
-               addr.setCity(TEST_CITY);
-               addr.setState(TEST_STATE);
-               addr.setZip(TEST_ZIP);
-
-
-               User user1 = new User();
-
-               user1.setName(TEST_NAME);
-               user1.setFavoriteNumber(256);
-               user1.setTypeDoubleTest(123.45d);
-               user1.setTypeBoolTest(true);
-               user1.setTypeArrayString(stringArray);
-               user1.setTypeArrayBoolean(booleanArray);
-               user1.setTypeEnum(TEST_ENUM_COLOR);
-               user1.setTypeMap(longMap);
-               user1.setTypeNested(addr);
-
-               // Construct via builder
-               User user2 = User.newBuilder()
-                               .setName("Charlie")
-                               .setFavoriteColor("blue")
-                               .setFavoriteNumber(null)
-                               .setTypeBoolTest(false)
-                               .setTypeDoubleTest(1.337d)
-                               .setTypeNullTest(null)
-                               .setTypeLongTest(1337L)
-                               .setTypeArrayString(new 
ArrayList<CharSequence>())
-                               .setTypeArrayBoolean(new ArrayList<Boolean>())
-                               .setTypeNullableArray(null)
-                               .setTypeEnum(Colors.RED)
-                               .setTypeMap(new HashMap<CharSequence, Long>())
-                               .setTypeFixed(null)
-                               .setTypeUnion(null)
-                               .setTypeNested(
-                                               
Address.newBuilder().setNum(TEST_NUM).setStreet(TEST_STREET)
-                                                               
.setCity(TEST_CITY).setState(TEST_STATE).setZip(TEST_ZIP)
-                                                               .build())
-                               .build();
-               DatumWriter<User> userDatumWriter = new 
SpecificDatumWriter<User>(User.class);
-               DataFileWriter<User> dataFileWriter = new 
DataFileWriter<User>(userDatumWriter);
-               dataFileWriter.create(user1.getSchema(), testFile);
-               dataFileWriter.append(user1);
-               dataFileWriter.append(user2);
-               dataFileWriter.close();
-       }
-       @Before
-       public void createFiles() throws IOException {
-               testFile = File.createTempFile("AvroInputFormatTest", null);
-               writeTestFile(testFile);
-       }
-
-
-       /**
-        * Test if the AvroInputFormat is able to properly read data from an 
avro file.
-        * @throws IOException
-        */
-       @Test
-       public void testDeserialisation() throws IOException {
-               Configuration parameters = new Configuration();
-               
-               AvroInputFormat<User> format = new AvroInputFormat<User>(new 
Path(testFile.getAbsolutePath()), User.class);
-               
-               format.configure(parameters);
-               FileInputSplit[] splits = format.createInputSplits(1);
-               assertEquals(splits.length, 1);
-               format.open(splits[0]);
-               
-               User u = format.nextRecord(null);
-               assertNotNull(u);
-               
-               String name = u.getName().toString();
-               assertNotNull("empty record", name);
-               assertEquals("name not equal", TEST_NAME, name);
-               
-               // check arrays
-               List<CharSequence> sl = u.getTypeArrayString();
-               assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, 
sl.get(0).toString());
-               assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, 
sl.get(1).toString());
-               
-               List<Boolean> bl = u.getTypeArrayBoolean();
-               assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, 
bl.get(0));
-               assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, 
bl.get(1));
-               
-               // check enums
-               Colors enumValue = u.getTypeEnum();
-               assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue);
-               
-               // check maps
-               Map<CharSequence, Long> lm = u.getTypeMap();
-               assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, 
lm.get(new Utf8(TEST_MAP_KEY1)).longValue());
-               assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, 
lm.get(new Utf8(TEST_MAP_KEY2)).longValue());
-               
-               assertFalse("expecting second element", format.reachedEnd());
-               assertNotNull("expecting second element", format.nextRecord(u));
-               
-               assertNull(format.nextRecord(u));
-               assertTrue(format.reachedEnd());
-               
-               format.close();
-       }
-
-       /**
-        * Test if the AvroInputFormat is able to properly read data from an 
avro file.
-        * @throws IOException
-        */
-       @Test
-       public void testDeserialisationReuseAvroRecordFalse() throws 
IOException {
-               Configuration parameters = new Configuration();
-               
-               AvroInputFormat<User> format = new AvroInputFormat<User>(new 
Path(testFile.getAbsolutePath()), User.class);
-               format.setReuseAvroValue(false);
-               
-               format.configure(parameters);
-               FileInputSplit[] splits = format.createInputSplits(1);
-               assertEquals(splits.length, 1);
-               format.open(splits[0]);
-               
-               User u = format.nextRecord(null);
-               assertNotNull(u);
-               
-               String name = u.getName().toString();
-               assertNotNull("empty record", name);
-               assertEquals("name not equal", TEST_NAME, name);
-               
-               // check arrays
-               List<CharSequence> sl = u.getTypeArrayString();
-               assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, 
sl.get(0).toString());
-               assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, 
sl.get(1).toString());
-               
-               List<Boolean> bl = u.getTypeArrayBoolean();
-               assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, 
bl.get(0));
-               assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, 
bl.get(1));
-               
-               // check enums
-               Colors enumValue = u.getTypeEnum();
-               assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue);
-               
-               // check maps
-               Map<CharSequence, Long> lm = u.getTypeMap();
-               assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, 
lm.get(new Utf8(TEST_MAP_KEY1)).longValue());
-               assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, 
lm.get(new Utf8(TEST_MAP_KEY2)).longValue());
-               
-               assertFalse("expecting second element", format.reachedEnd());
-               assertNotNull("expecting second element", format.nextRecord(u));
-               
-               assertNull(format.nextRecord(u));
-               assertTrue(format.reachedEnd());
-               
-               format.close();
-       }
-
-       /**
-        * Test if the Flink serialization is able to properly process 
GenericData.Record types.
-        * Usually users of Avro generate classes (POJOs) from Avro schemas.
-        * However, if generated classes are not available, one can also use 
GenericData.Record.
-        * It is an untyped key-value record which is using a schema to 
validate the correctness of the data.
-        *
-        * It is not recommended to use GenericData.Record with Flink. Use 
generated POJOs instead.
-        */
-       @Test
-       public void testDeserializeToGenericType() throws IOException {
-               DatumReader<GenericData.Record> datumReader = new 
GenericDatumReader<>(userSchema);
-
-               try (FileReader<GenericData.Record> dataFileReader = 
DataFileReader.openReader(testFile, datumReader)) {
-                       // initialize Record by reading it from disk (thats 
easier than creating it by hand)
-                       GenericData.Record rec = new 
GenericData.Record(userSchema);
-                       dataFileReader.next(rec);
-                       
-                       // check if record has been read correctly
-                       assertNotNull(rec);
-                       assertEquals("name not equal", TEST_NAME, 
rec.get("name").toString());
-                       assertEquals("enum not equal", 
TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString());
-                       assertEquals(null, rec.get("type_long_test")); // it is 
null for the first record.
-
-                       // now serialize it with our framework:
-                       TypeInformation<GenericData.Record> te = 
TypeExtractor.createTypeInfo(GenericData.Record.class);
-
-                       ExecutionConfig ec = new ExecutionConfig();
-                       Assert.assertEquals(GenericTypeInfo.class, 
te.getClass());
-                       
-                       Serializers.recursivelyRegisterType(te.getTypeClass(), 
ec, new HashSet<Class<?>>());
-
-                       TypeSerializer<GenericData.Record> tser = 
te.createSerializer(ec);
-                       Assert.assertEquals(1, 
ec.getDefaultKryoSerializerClasses().size());
-                       Assert.assertTrue(
-                                       
ec.getDefaultKryoSerializerClasses().containsKey(Schema.class) &&
-                                                       
ec.getDefaultKryoSerializerClasses().get(Schema.class).equals(Serializers.AvroSchemaSerializer.class));
-
-                       ByteArrayOutputStream out = new ByteArrayOutputStream();
-                       try (DataOutputViewStreamWrapper outView = new 
DataOutputViewStreamWrapper(out)) {
-                               tser.serialize(rec, outView);
-                       }
-
-                       GenericData.Record newRec;
-                       try (DataInputViewStreamWrapper inView = new 
DataInputViewStreamWrapper(
-                                       new 
ByteArrayInputStream(out.toByteArray())))
-                       {
-                               newRec = tser.deserialize(inView);
-                       }
-
-                       // check if it is still the same
-                       assertNotNull(newRec);
-                       assertEquals("enum not equal", 
TEST_ENUM_COLOR.toString(), newRec.get("type_enum").toString());
-                       assertEquals("name not equal", TEST_NAME, 
newRec.get("name").toString());
-                       assertEquals(null, newRec.get("type_long_test"));
-               }
-       }
-               
-       /**
-        * This test validates proper serialization with specific (generated 
POJO) types.
-        */
-       @Test
-       public void testDeserializeToSpecificType() throws IOException {
-
-               DatumReader<User> datumReader = new 
SpecificDatumReader<User>(userSchema);
-
-               try (FileReader<User> dataFileReader = 
DataFileReader.openReader(testFile, datumReader)) {
-                       User rec = dataFileReader.next();
-
-                       // check if record has been read correctly
-                       assertNotNull(rec);
-                       assertEquals("name not equal", TEST_NAME, 
rec.get("name").toString());
-                       assertEquals("enum not equal", 
TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString());
-
-                       // now serialize it with our framework:
-                       ExecutionConfig ec = new ExecutionConfig();
-                       TypeInformation<User> te = 
TypeExtractor.createTypeInfo(User.class);
-
-                       Assert.assertEquals(AvroTypeInfo.class, te.getClass());
-                       TypeSerializer<User> tser = te.createSerializer(ec);
-
-                       ByteArrayOutputStream out = new ByteArrayOutputStream();
-                       try (DataOutputViewStreamWrapper outView = new 
DataOutputViewStreamWrapper(out)) {
-                               tser.serialize(rec, outView);
-                       }
-
-                       User newRec;
-                       try (DataInputViewStreamWrapper inView = new 
DataInputViewStreamWrapper(
-                                       new 
ByteArrayInputStream(out.toByteArray())))
-                       {
-                               newRec = tser.deserialize(inView);
-                       }
-
-                       // check if it is still the same
-                       assertNotNull(newRec);
-                       assertEquals("name not equal", TEST_NAME, 
newRec.getName().toString());
-                       assertEquals("enum not equal", 
TEST_ENUM_COLOR.toString(), newRec.getTypeEnum().toString());
-               }
-       }
-
-       /**
-        * Test if the AvroInputFormat is able to properly read data from an 
Avro
-        * file as a GenericRecord.
-        * 
-        * @throws IOException,
-        *             if there is an exception
-        */
-       @Test
-       public void testDeserialisationGenericRecord() throws IOException {
-               Configuration parameters = new Configuration();
-
-               AvroInputFormat<GenericRecord> format = new 
AvroInputFormat<GenericRecord>(new Path(testFile.getAbsolutePath()),
-                               GenericRecord.class);
-
-               doTestDeserializationGenericRecord(format, parameters);
-       }
-
-       /**
-        * Helper method to test GenericRecord serialisation
-        * 
-        * @param format
-        *            the format to test
-        * @param parameters
-        *            the configuration to use
-        * @throws IOException
-        *             thrown id there is a issue
-        */
-       @SuppressWarnings("unchecked")
-       private void doTestDeserializationGenericRecord(final 
AvroInputFormat<GenericRecord> format,
-                       final Configuration parameters) throws IOException {
-               try {
-                       format.configure(parameters);
-                       FileInputSplit[] splits = format.createInputSplits(1);
-                       assertEquals(splits.length, 1);
-                       format.open(splits[0]);
-
-                       GenericRecord u = format.nextRecord(null);
-                       assertNotNull(u);
-                       assertEquals("The schemas should be equal", userSchema, 
u.getSchema());
-
-                       String name = u.get("name").toString();
-                       assertNotNull("empty record", name);
-                       assertEquals("name not equal", TEST_NAME, name);
-
-                       // check arrays
-                       List<CharSequence> sl = (List<CharSequence>) 
u.get("type_array_string");
-                       assertEquals("element 0 not equal", 
TEST_ARRAY_STRING_1, sl.get(0).toString());
-                       assertEquals("element 1 not equal", 
TEST_ARRAY_STRING_2, sl.get(1).toString());
-
-                       List<Boolean> bl = (List<Boolean>) 
u.get("type_array_boolean");
-                       assertEquals("element 0 not equal", 
TEST_ARRAY_BOOLEAN_1, bl.get(0));
-                       assertEquals("element 1 not equal", 
TEST_ARRAY_BOOLEAN_2, bl.get(1));
-
-                       // check enums
-                       GenericData.EnumSymbol enumValue = 
(GenericData.EnumSymbol) u.get("type_enum");
-                       assertEquals("enum not equal", 
TEST_ENUM_COLOR.toString(), enumValue.toString());
-
-                       // check maps
-                       Map<CharSequence, Long> lm = (Map<CharSequence, Long>) 
u.get("type_map");
-                       assertEquals("map value of key 1 not equal", 
TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue());
-                       assertEquals("map value of key 2 not equal", 
TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue());
-
-                       assertFalse("expecting second element", 
format.reachedEnd());
-                       assertNotNull("expecting second element", 
format.nextRecord(u));
-
-                       assertNull(format.nextRecord(u));
-                       assertTrue(format.reachedEnd());
-               } finally {
-                       format.close();
-               }
-       }
-
-       /**
-        * Test if the AvroInputFormat is able to properly read data from an 
avro
-        * file as a GenericRecord
-        * 
-        * @throws IOException,
-        *             if there is an error
-        */
-       @Test
-       public void testDeserialisationGenericRecordReuseAvroValueFalse() 
throws IOException {
-               Configuration parameters = new Configuration();
-
-               AvroInputFormat<GenericRecord> format = new 
AvroInputFormat<GenericRecord>(new Path(testFile.getAbsolutePath()),
-                               GenericRecord.class);
-               format.configure(parameters);
-               format.setReuseAvroValue(false);
-
-               doTestDeserializationGenericRecord(format, parameters);
-       }
-
-       @After
-       public void deleteFiles() {
-               testFile.delete();
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
 
b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
deleted file mode 100644
index 37a83d1..0000000
--- 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.io.avro;
-
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.flink.api.io.avro.generated.Address;
-import org.apache.flink.api.io.avro.generated.Colors;
-import org.apache.flink.api.io.avro.generated.Fixed16;
-import org.apache.flink.api.io.avro.generated.User;
-import org.apache.flink.api.java.io.AvroInputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Random;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Test the avro input format.
- * (The testcase is mostly the getting started tutorial of avro)
- * http://avro.apache.org/docs/current/gettingstartedjava.html
- */
-public class AvroSplittableInputFormatTest {
-       
-       private File testFile;
-       
-       final static String TEST_NAME = "Alyssa";
-       
-       final static String TEST_ARRAY_STRING_1 = "ELEMENT 1";
-       final static String TEST_ARRAY_STRING_2 = "ELEMENT 2";
-       
-       final static boolean TEST_ARRAY_BOOLEAN_1 = true;
-       final static boolean TEST_ARRAY_BOOLEAN_2 = false;
-       
-       final static Colors TEST_ENUM_COLOR = Colors.GREEN;
-       
-       final static String TEST_MAP_KEY1 = "KEY 1";
-       final static long TEST_MAP_VALUE1 = 8546456L;
-       final static String TEST_MAP_KEY2 = "KEY 2";
-       final static long TEST_MAP_VALUE2 = 17554L;
-
-       final static Integer TEST_NUM = new Integer(239);
-       final static String TEST_STREET = "Baker Street";
-       final static String TEST_CITY = "London";
-       final static String TEST_STATE = "London";
-       final static String TEST_ZIP = "NW1 6XE";
-       
-       final static int NUM_RECORDS = 5000;
-
-       @Before
-       public void createFiles() throws IOException {
-               testFile = File.createTempFile("AvroSplittableInputFormatTest", 
null);
-               
-               ArrayList<CharSequence> stringArray = new 
ArrayList<CharSequence>();
-               stringArray.add(TEST_ARRAY_STRING_1);
-               stringArray.add(TEST_ARRAY_STRING_2);
-               
-               ArrayList<Boolean> booleanArray = new ArrayList<Boolean>();
-               booleanArray.add(TEST_ARRAY_BOOLEAN_1);
-               booleanArray.add(TEST_ARRAY_BOOLEAN_2);
-               
-               HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, 
Long>();
-               longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1);
-               longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2);
-               
-               Address addr = new Address();
-               addr.setNum(new Integer(TEST_NUM));
-               addr.setStreet(TEST_STREET);
-               addr.setCity(TEST_CITY);
-               addr.setState(TEST_STATE);
-               addr.setZip(TEST_ZIP);
-               
-               
-               User user1 = new User();
-               user1.setName(TEST_NAME);
-               user1.setFavoriteNumber(256);
-               user1.setTypeDoubleTest(123.45d);
-               user1.setTypeBoolTest(true);
-               user1.setTypeArrayString(stringArray);
-               user1.setTypeArrayBoolean(booleanArray);
-               user1.setTypeEnum(TEST_ENUM_COLOR);
-               user1.setTypeMap(longMap);
-               user1.setTypeNested(addr);
-               
-               // Construct via builder
-               User user2 = User.newBuilder()
-                            .setName(TEST_NAME)
-                            .setFavoriteColor("blue")
-                            .setFavoriteNumber(null)
-                            .setTypeBoolTest(false)
-                            .setTypeDoubleTest(1.337d)
-                            .setTypeNullTest(null)
-                            .setTypeLongTest(1337L)
-                            .setTypeArrayString(new ArrayList<CharSequence>())
-                            .setTypeArrayBoolean(new ArrayList<Boolean>())
-                            .setTypeNullableArray(null)
-                            .setTypeEnum(Colors.RED)
-                            .setTypeMap(new HashMap<CharSequence, Long>())
-                                        .setTypeFixed(new Fixed16())
-                                        .setTypeUnion(123L)
-                               .setTypeNested(
-                                               
Address.newBuilder().setNum(TEST_NUM).setStreet(TEST_STREET)
-                                                               
.setCity(TEST_CITY).setState(TEST_STATE).setZip(TEST_ZIP)
-                                                               .build())
-
-                            .build();
-               DatumWriter<User> userDatumWriter = new 
SpecificDatumWriter<User>(User.class);
-               DataFileWriter<User> dataFileWriter = new 
DataFileWriter<User>(userDatumWriter);
-               dataFileWriter.create(user1.getSchema(), testFile);
-               dataFileWriter.append(user1);
-               dataFileWriter.append(user2);
-
-               Random rnd = new Random(1337);
-               for(int i = 0; i < NUM_RECORDS -2 ; i++) {
-                       User user = new User();
-                       user.setName(TEST_NAME + rnd.nextInt());
-                       user.setFavoriteNumber(rnd.nextInt());
-                       user.setTypeDoubleTest(rnd.nextDouble());
-                       user.setTypeBoolTest(true);
-                       user.setTypeArrayString(stringArray);
-                       user.setTypeArrayBoolean(booleanArray);
-                       user.setTypeEnum(TEST_ENUM_COLOR);
-                       user.setTypeMap(longMap);
-                       Address address = new Address();
-                       address.setNum(new Integer(TEST_NUM));
-                       address.setStreet(TEST_STREET);
-                       address.setCity(TEST_CITY);
-                       address.setState(TEST_STATE);
-                       address.setZip(TEST_ZIP);
-                       user.setTypeNested(address);
-
-                       dataFileWriter.append(user);
-               }
-               dataFileWriter.close();
-       }
-       
-       @Test
-       public void testSplittedIF() throws IOException {
-               Configuration parameters = new Configuration();
-               
-               AvroInputFormat<User> format = new AvroInputFormat<User>(new 
Path(testFile.getAbsolutePath()), User.class);
-
-               format.configure(parameters);
-               FileInputSplit[] splits = format.createInputSplits(4);
-               assertEquals(splits.length, 4);
-               int elements = 0;
-               int elementsPerSplit[] = new int[4];
-               for(int i = 0; i < splits.length; i++) {
-                       format.open(splits[i]);
-                       while(!format.reachedEnd()) {
-                               User u = format.nextRecord(null);
-                               
Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME));
-                               elements++;
-                               elementsPerSplit[i]++;
-                       }
-                       format.close();
-               }
-
-               Assert.assertEquals(1539, elementsPerSplit[0]);
-               Assert.assertEquals(1026, elementsPerSplit[1]);
-               Assert.assertEquals(1539, elementsPerSplit[2]);
-               Assert.assertEquals(896, elementsPerSplit[3]);
-               Assert.assertEquals(NUM_RECORDS, elements);
-               format.close();
-       }
-
-       @Test
-       public void testAvroRecoveryWithFailureAtStart() throws Exception {
-               final int recordsUntilCheckpoint = 132;
-
-               Configuration parameters = new Configuration();
-
-               AvroInputFormat<User> format = new AvroInputFormat<User>(new 
Path(testFile.getAbsolutePath()), User.class);
-               format.configure(parameters);
-
-               FileInputSplit[] splits = format.createInputSplits(4);
-               assertEquals(splits.length, 4);
-
-               int elements = 0;
-               int elementsPerSplit[] = new int[4];
-               for(int i = 0; i < splits.length; i++) {
-                       format.reopen(splits[i], format.getCurrentState());
-                       while(!format.reachedEnd()) {
-                               User u = format.nextRecord(null);
-                               
Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME));
-                               elements++;
-
-                               if(format.getRecordsReadFromBlock() == 
recordsUntilCheckpoint) {
-
-                                       // do the whole checkpoint-restore 
procedure and see if we pick up from where we left off.
-                                       Tuple2<Long, Long> state = 
format.getCurrentState();
-
-                                       // this is to make sure that nothing 
stays from the previous format
-                                       // (as it is going to be in the normal 
case)
-                                       format = new AvroInputFormat<>(new 
Path(testFile.getAbsolutePath()), User.class);
-
-                                       format.reopen(splits[i], state);
-                                       
assertEquals(format.getRecordsReadFromBlock(), recordsUntilCheckpoint);
-                               }
-                               elementsPerSplit[i]++;
-                       }
-                       format.close();
-               }
-
-               Assert.assertEquals(1539, elementsPerSplit[0]);
-               Assert.assertEquals(1026, elementsPerSplit[1]);
-               Assert.assertEquals(1539, elementsPerSplit[2]);
-               Assert.assertEquals(896, elementsPerSplit[3]);
-               Assert.assertEquals(NUM_RECORDS, elements);
-               format.close();
-       }
-
-       @Test
-       public void testAvroRecovery() throws Exception {
-               final int recordsUntilCheckpoint = 132;
-
-               Configuration parameters = new Configuration();
-
-               AvroInputFormat<User> format = new AvroInputFormat<User>(new 
Path(testFile.getAbsolutePath()), User.class);
-               format.configure(parameters);
-
-               FileInputSplit[] splits = format.createInputSplits(4);
-               assertEquals(splits.length, 4);
-
-               int elements = 0;
-               int elementsPerSplit[] = new int[4];
-               for(int i = 0; i < splits.length; i++) {
-                       format.open(splits[i]);
-                       while(!format.reachedEnd()) {
-                               User u = format.nextRecord(null);
-                               
Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME));
-                               elements++;
-
-                               if(format.getRecordsReadFromBlock() == 
recordsUntilCheckpoint) {
-
-                                       // do the whole checkpoint-restore 
procedure and see if we pick up from where we left off.
-                                       Tuple2<Long, Long> state = 
format.getCurrentState();
-
-                                       // this is to make sure that nothing 
stays from the previous format
-                                       // (as it is going to be in the normal 
case)
-                                       format = new AvroInputFormat<>(new 
Path(testFile.getAbsolutePath()), User.class);
-
-                                       format.reopen(splits[i], state);
-                                       
assertEquals(format.getRecordsReadFromBlock(), recordsUntilCheckpoint);
-                               }
-                               elementsPerSplit[i]++;
-                       }
-                       format.close();
-               }
-
-               Assert.assertEquals(1539, elementsPerSplit[0]);
-               Assert.assertEquals(1026, elementsPerSplit[1]);
-               Assert.assertEquals(1539, elementsPerSplit[2]);
-               Assert.assertEquals(896, elementsPerSplit[3]);
-               Assert.assertEquals(NUM_RECORDS, elements);
-               format.close();
-       }
-
-       /*
-       This test is gave the reference values for the test of Flink's IF.
-
-       This dependency needs to be added
-
-        <dependency>
-            <groupId>org.apache.avro</groupId>
-            <artifactId>avro-mapred</artifactId>
-            <version>1.7.6</version>
-        </dependency>
-
-       @Test
-       public void testHadoop() throws Exception {
-               JobConf jf = new JobConf();
-               FileInputFormat.addInputPath(jf, new 
org.apache.hadoop.fs.Path(testFile.toURI()));
-               
jf.setBoolean(org.apache.avro.mapred.AvroInputFormat.IGNORE_FILES_WITHOUT_EXTENSION_KEY,
 false);
-               org.apache.avro.mapred.AvroInputFormat<User> format = new 
org.apache.avro.mapred.AvroInputFormat<User>();
-               InputSplit[] sp = format.getSplits(jf, 4);
-               int elementsPerSplit[] = new int[4];
-               int cnt = 0;
-               int i = 0;
-               for(InputSplit s:sp) {
-                       RecordReader<AvroWrapper<User>, NullWritable> r = 
format.getRecordReader(s, jf, new HadoopDummyReporter());
-                       AvroWrapper<User> k = r.createKey();
-                       NullWritable v = r.createValue();
-
-                       while(r.next(k,v)) {
-                               cnt++;
-                               elementsPerSplit[i]++;
-                       }
-                       i++;
-               }
-               System.out.println("Status "+Arrays.toString(elementsPerSplit));
-       } **/
-       
-       @After
-       public void deleteFiles() {
-               testFile.delete();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java
 
b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java
deleted file mode 100644
index 5a21691..0000000
--- 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.io.avro.example;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.io.GenericInputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("serial")
-public class AvroTypeExample {
-       
-       
-       public static void main(String[] args) throws Exception {
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<User> users = env.createInput(new 
UserGeneratingInputFormat());
-
-               users
-                       .map(new NumberExtractingMapper())
-                       .groupBy(1)
-                       .reduceGroup(new ConcatenatingReducer())
-                       .print();
-       }
-       
-       
-       public static final class NumberExtractingMapper implements 
MapFunction<User, Tuple2<User, Integer>> {
-               
-               @Override
-               public Tuple2<User, Integer> map(User user) {
-                       return new Tuple2<User, Integer>(user, 
user.getFavoriteNumber());
-               }
-       }
-       
-       
-       public static final class ConcatenatingReducer implements 
GroupReduceFunction<Tuple2<User, Integer>, Tuple2<Integer, String>> {
-
-               @Override
-               public void reduce(Iterable<Tuple2<User, Integer>> values, 
Collector<Tuple2<Integer, String>> out) throws Exception {
-                       int number = 0;
-                       StringBuilder colors = new StringBuilder();
-                       
-                       for (Tuple2<User, Integer> u : values) {
-                               number = u.f1;
-                               colors.append(u.f0.getFavoriteColor()).append(" 
- ");
-                       }
-                       
-                       colors.setLength(colors.length() - 3);
-                       out.collect(new Tuple2<Integer, String>(number, 
colors.toString()));
-               }
-       }
-       
-       
-       public static final class UserGeneratingInputFormat extends 
GenericInputFormat<User> {
-
-               private static final long serialVersionUID = 1L;
-               
-               private static final int NUM = 100;
-               
-               private final Random rnd = new Random(32498562304986L);
-               
-               private static final String[] NAMES = { "Peter", "Bob", 
"Liddy", "Alexander", "Stan" };
-               
-               private static final String[] COLORS = { "mauve", "crimson", 
"copper", "sky", "grass" };
-               
-               private int count;
-               
-
-               @Override
-               public boolean reachedEnd() throws IOException {
-                       return count >= NUM;
-               }
-
-               @Override
-               public User nextRecord(User reuse) throws IOException {
-                       count++;
-                       
-                       User u = new User();
-                       u.setName(NAMES[rnd.nextInt(NAMES.length)]);
-                       u.setFavoriteColor(COLORS[rnd.nextInt(COLORS.length)]);
-                       u.setFavoriteNumber(rnd.nextInt(87));
-                       return u;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/User.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/User.java
 
b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/User.java
deleted file mode 100644
index 4608f96..0000000
--- 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/User.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/**
- * Autogenerated by Avro
- * 
- * DO NOT EDIT DIRECTLY
- */
-package org.apache.flink.api.io.avro.example;  
-@SuppressWarnings("all")
[email protected]
-public class User extends org.apache.avro.specific.SpecificRecordBase 
implements org.apache.avro.specific.SpecificRecord {
-  public static final org.apache.avro.Schema SCHEMA$ = new 
org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.apache.flink.api.avro.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}");
-  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
-  @Deprecated public java.lang.CharSequence name;
-  @Deprecated public java.lang.Integer favorite_number;
-  @Deprecated public java.lang.CharSequence favorite_color;
-
-  /**
-   * Default constructor.  Note that this does not initialize fields
-   * to their default values from the schema.  If that is desired then
-   * one should use {@link #newBuilder()}. 
-   */
-  public User() {}
-
-  /**
-   * All-args constructor.
-   */
-  public User(java.lang.CharSequence name, java.lang.Integer favorite_number, 
java.lang.CharSequence favorite_color) {
-    this.name = name;
-    this.favorite_number = favorite_number;
-    this.favorite_color = favorite_color;
-  }
-
-  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
-  // Used by DatumWriter.  Applications should not call. 
-  public java.lang.Object get(int field$) {
-    switch (field$) {
-    case 0: return name;
-    case 1: return favorite_number;
-    case 2: return favorite_color;
-    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
-    }
-  }
-  // Used by DatumReader.  Applications should not call. 
-  @SuppressWarnings(value="unchecked")
-  public void put(int field$, java.lang.Object value$) {
-    switch (field$) {
-    case 0: name = (java.lang.CharSequence)value$; break;
-    case 1: favorite_number = (java.lang.Integer)value$; break;
-    case 2: favorite_color = (java.lang.CharSequence)value$; break;
-    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
-    }
-  }
-
-  /**
-   * Gets the value of the 'name' field.
-   */
-  public java.lang.CharSequence getName() {
-    return name;
-  }
-
-  /**
-   * Sets the value of the 'name' field.
-   * @param value the value to set.
-   */
-  public void setName(java.lang.CharSequence value) {
-    this.name = value;
-  }
-
-  /**
-   * Gets the value of the 'favorite_number' field.
-   */
-  public java.lang.Integer getFavoriteNumber() {
-    return favorite_number;
-  }
-
-  /**
-   * Sets the value of the 'favorite_number' field.
-   * @param value the value to set.
-   */
-  public void setFavoriteNumber(java.lang.Integer value) {
-    this.favorite_number = value;
-  }
-
-  /**
-   * Gets the value of the 'favorite_color' field.
-   */
-  public java.lang.CharSequence getFavoriteColor() {
-    return favorite_color;
-  }
-
-  /**
-   * Sets the value of the 'favorite_color' field.
-   * @param value the value to set.
-   */
-  public void setFavoriteColor(java.lang.CharSequence value) {
-    this.favorite_color = value;
-  }
-
-  /** Creates a new User RecordBuilder */
-  public static org.apache.flink.api.io.avro.example.User.Builder newBuilder() 
{
-    return new org.apache.flink.api.io.avro.example.User.Builder();
-  }
-  
-  /** Creates a new User RecordBuilder by copying an existing Builder */
-  public static org.apache.flink.api.io.avro.example.User.Builder 
newBuilder(org.apache.flink.api.io.avro.example.User.Builder other) {
-    return new org.apache.flink.api.io.avro.example.User.Builder(other);
-  }
-  
-  /** Creates a new User RecordBuilder by copying an existing User instance */
-  public static org.apache.flink.api.io.avro.example.User.Builder 
newBuilder(org.apache.flink.api.io.avro.example.User other) {
-    return new org.apache.flink.api.io.avro.example.User.Builder(other);
-  }
-  
-  /**
-   * RecordBuilder for User instances.
-   */
-  public static class Builder extends 
org.apache.avro.specific.SpecificRecordBuilderBase<User>
-    implements org.apache.avro.data.RecordBuilder<User> {
-
-    private java.lang.CharSequence name;
-    private java.lang.Integer favorite_number;
-    private java.lang.CharSequence favorite_color;
-
-    /** Creates a new Builder */
-    private Builder() {
-      super(org.apache.flink.api.io.avro.example.User.SCHEMA$);
-    }
-    
-    /** Creates a Builder by copying an existing Builder */
-    private Builder(org.apache.flink.api.io.avro.example.User.Builder other) {
-      super(other);
-      if (isValidValue(fields()[0], other.name)) {
-        this.name = data().deepCopy(fields()[0].schema(), other.name);
-        fieldSetFlags()[0] = true;
-      }
-      if (isValidValue(fields()[1], other.favorite_number)) {
-        this.favorite_number = data().deepCopy(fields()[1].schema(), 
other.favorite_number);
-        fieldSetFlags()[1] = true;
-      }
-      if (isValidValue(fields()[2], other.favorite_color)) {
-        this.favorite_color = data().deepCopy(fields()[2].schema(), 
other.favorite_color);
-        fieldSetFlags()[2] = true;
-      }
-    }
-    
-    /** Creates a Builder by copying an existing User instance */
-    private Builder(org.apache.flink.api.io.avro.example.User other) {
-            super(org.apache.flink.api.io.avro.example.User.SCHEMA$);
-      if (isValidValue(fields()[0], other.name)) {
-        this.name = data().deepCopy(fields()[0].schema(), other.name);
-        fieldSetFlags()[0] = true;
-      }
-      if (isValidValue(fields()[1], other.favorite_number)) {
-        this.favorite_number = data().deepCopy(fields()[1].schema(), 
other.favorite_number);
-        fieldSetFlags()[1] = true;
-      }
-      if (isValidValue(fields()[2], other.favorite_color)) {
-        this.favorite_color = data().deepCopy(fields()[2].schema(), 
other.favorite_color);
-        fieldSetFlags()[2] = true;
-      }
-    }
-
-    /** Gets the value of the 'name' field */
-    public java.lang.CharSequence getName() {
-      return name;
-    }
-    
-    /** Sets the value of the 'name' field */
-    public org.apache.flink.api.io.avro.example.User.Builder 
setName(java.lang.CharSequence value) {
-      validate(fields()[0], value);
-      this.name = value;
-      fieldSetFlags()[0] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'name' field has been set */
-    public boolean hasName() {
-      return fieldSetFlags()[0];
-    }
-    
-    /** Clears the value of the 'name' field */
-    public org.apache.flink.api.io.avro.example.User.Builder clearName() {
-      name = null;
-      fieldSetFlags()[0] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'favorite_number' field */
-    public java.lang.Integer getFavoriteNumber() {
-      return favorite_number;
-    }
-    
-    /** Sets the value of the 'favorite_number' field */
-    public org.apache.flink.api.io.avro.example.User.Builder 
setFavoriteNumber(java.lang.Integer value) {
-      validate(fields()[1], value);
-      this.favorite_number = value;
-      fieldSetFlags()[1] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'favorite_number' field has been set */
-    public boolean hasFavoriteNumber() {
-      return fieldSetFlags()[1];
-    }
-    
-    /** Clears the value of the 'favorite_number' field */
-    public org.apache.flink.api.io.avro.example.User.Builder 
clearFavoriteNumber() {
-      favorite_number = null;
-      fieldSetFlags()[1] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'favorite_color' field */
-    public java.lang.CharSequence getFavoriteColor() {
-      return favorite_color;
-    }
-    
-    /** Sets the value of the 'favorite_color' field */
-    public org.apache.flink.api.io.avro.example.User.Builder 
setFavoriteColor(java.lang.CharSequence value) {
-      validate(fields()[2], value);
-      this.favorite_color = value;
-      fieldSetFlags()[2] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'favorite_color' field has been set */
-    public boolean hasFavoriteColor() {
-      return fieldSetFlags()[2];
-    }
-    
-    /** Clears the value of the 'favorite_color' field */
-    public org.apache.flink.api.io.avro.example.User.Builder 
clearFavoriteColor() {
-      favorite_color = null;
-      fieldSetFlags()[2] = false;
-      return this;
-    }
-
-    @Override
-    public User build() {
-      try {
-        User record = new User();
-        record.name = fieldSetFlags()[0] ? this.name : 
(java.lang.CharSequence) defaultValue(fields()[0]);
-        record.favorite_number = fieldSetFlags()[1] ? this.favorite_number : 
(java.lang.Integer) defaultValue(fields()[1]);
-        record.favorite_color = fieldSetFlags()[2] ? this.favorite_color : 
(java.lang.CharSequence) defaultValue(fields()[2]);
-        return record;
-      } catch (Exception e) {
-        throw new org.apache.avro.AvroRuntimeException(e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
 
b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
deleted file mode 100644
index e245026..0000000
--- 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.io;
-
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.typeutils.PojoTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.core.fs.Path;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class AvroInputFormatTypeExtractionTest {
-
-       @Test
-       public void testTypeExtraction() {
-               try {
-                       InputFormat<MyAvroType, ?> format = new 
AvroInputFormat<MyAvroType>(new Path("file:///ignore/this/file"), 
MyAvroType.class);
-
-                       TypeInformation<?> typeInfoDirect = 
TypeExtractor.getInputFormatTypes(format);
-
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       DataSet<MyAvroType> input = env.createInput(format);
-                       TypeInformation<?> typeInfoDataSet = input.getType();
-
-
-                       Assert.assertTrue(typeInfoDirect instanceof 
PojoTypeInfo);
-                       Assert.assertTrue(typeInfoDataSet instanceof 
PojoTypeInfo);
-
-                       Assert.assertEquals(MyAvroType.class, 
typeInfoDirect.getTypeClass());
-                       Assert.assertEquals(MyAvroType.class, 
typeInfoDataSet.getTypeClass());
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail(e.getMessage());
-               }
-       }
-
-       public static final class MyAvroType {
-
-               public String theString;
-
-               public MyAvroType recursive;
-
-               private double aDouble;
-
-               public double getaDouble() {
-                       return aDouble;
-               }
-
-               public void setaDouble(double aDouble) {
-                       this.aDouble = aDouble;
-               }
-
-               public void setTheString(String theString) {
-                       this.theString = theString;
-               }
-
-               public String getTheString() {
-                       return theString;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
 
b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
deleted file mode 100644
index 4d6c6b7..0000000
--- 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io;
-
-import static org.apache.flink.api.java.io.AvroOutputFormat.Codec;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-
-import org.apache.avro.Schema;
-import org.apache.flink.api.io.avro.example.User;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.junit.Test;
-import org.mockito.internal.util.reflection.Whitebox;
-
-/**
- * Tests for {@link AvroOutputFormat}
- */
-public class AvroOutputFormatTest {
-
-       @Test
-       public void testSetCodec() throws Exception {
-               // given
-               final AvroOutputFormat<User> outputFormat = new 
AvroOutputFormat<>(User.class);
-
-               // when
-               try {
-                       outputFormat.setCodec(Codec.SNAPPY);
-               } catch (Exception ex) {
-                       // then
-                       fail("unexpected exception");
-               }
-       }
-
-       @Test
-       public void testSetCodecError() throws Exception {
-               // given
-               boolean error = false;
-               final AvroOutputFormat<User> outputFormat = new 
AvroOutputFormat<>(User.class);
-
-               // when
-               try {
-                       outputFormat.setCodec(null);
-               } catch (Exception ex) {
-                       error = true;
-               }
-
-               // then
-               assertTrue(error);
-       }
-
-       @Test
-       public void testSerialization() throws Exception {
-
-               serializeAndDeserialize(null, null);
-               serializeAndDeserialize(null, User.SCHEMA$);
-               for (final Codec codec : Codec.values()) {
-                       serializeAndDeserialize(codec, null);
-                       serializeAndDeserialize(codec, User.SCHEMA$);
-               }
-       }
-
-       private void serializeAndDeserialize(final Codec codec, final Schema 
schema) throws IOException, ClassNotFoundException {
-               // given
-               final AvroOutputFormat<User> outputFormat = new 
AvroOutputFormat<>(User.class);
-               if (codec != null) {
-                       outputFormat.setCodec(codec);
-               }
-               if (schema != null) {
-                       outputFormat.setSchema(schema);
-               }
-
-               final ByteArrayOutputStream bos = new ByteArrayOutputStream();
-
-               // when
-               try (final ObjectOutputStream oos = new 
ObjectOutputStream(bos)) {
-                       oos.writeObject(outputFormat);
-               }
-               try (final ObjectInputStream ois = new ObjectInputStream(new 
ByteArrayInputStream(bos.toByteArray()))) {
-                       // then
-                       Object o = ois.readObject();
-                       assertTrue(o instanceof AvroOutputFormat);
-                       final AvroOutputFormat<User> restored = 
(AvroOutputFormat<User>) o;
-                       final Codec restoredCodec = (Codec) 
Whitebox.getInternalState(restored, "codec");
-                       final Schema restoredSchema = (Schema) 
Whitebox.getInternalState(restored, "userDefinedSchema");
-
-                       assertTrue(codec != null ? restoredCodec == codec : 
restoredCodec == null);
-                       assertTrue(schema != null ? 
restoredSchema.equals(schema) : restoredSchema == null);
-               }
-       }
-
-       @Test
-       public void testCompression() throws Exception {
-               // given
-               final Path outputPath = new 
Path(File.createTempFile("avro-output-file","avro").getAbsolutePath());
-               final AvroOutputFormat<User> outputFormat = new 
AvroOutputFormat<>(outputPath,User.class);
-               outputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
-
-               final Path compressedOutputPath = new 
Path(File.createTempFile("avro-output-file","compressed.avro").getAbsolutePath());
-               final AvroOutputFormat<User> compressedOutputFormat = new 
AvroOutputFormat<>(compressedOutputPath,User.class);
-               
compressedOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
-               compressedOutputFormat.setCodec(Codec.SNAPPY);
-
-               // when
-               output(outputFormat);
-               output(compressedOutputFormat);
-
-               // then
-               assertTrue(fileSize(outputPath) > 
fileSize(compressedOutputPath));
-
-               // cleanup
-               Files.delete(Paths.get(outputPath.getPath()));
-               Files.delete(Paths.get(compressedOutputPath.getPath()));
-       }
-
-       private long fileSize(Path path) throws IOException {
-               return Files.size(Paths.get(path.getPath()));
-       }
-
-       private void output(final AvroOutputFormat<User> outputFormat) throws 
IOException {
-               outputFormat.configure(new Configuration());
-               outputFormat.open(1,1);
-               for (int i = 0; i < 100; i++) {
-                       outputFormat.writeRecord(new User("testUser",1,"blue"));
-               }
-               outputFormat.close();
-       }
-}

Reply via email to