http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java
deleted file mode 100644
index 19fac43..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-public class AvroGenericTypeComparatorTest extends 
AbstractGenericTypeComparatorTest {
-       @Override
-       protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
-               return new AvroSerializer<T>(type);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java
deleted file mode 100644
index df1ff60..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-public class AvroGenericTypeSerializerTest extends 
AbstractGenericTypeSerializerTest {
-
-       @Override
-       protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
-               return new AvroSerializer<T>(type);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java
deleted file mode 100644
index 8a89410..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import static org.junit.Assert.*;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.avro.reflect.Nullable;
-import org.apache.flink.api.common.typeutils.SerializerTestInstance;
-import org.junit.Test;
-
-public class AvroSerializerEmptyArrayTest {
-
-       @Test
-       public void testBookSerialization() {
-               try {
-                       Book b = new Book(123, "This is a test book", 26382648);
-                       AvroSerializer<Book> serializer = new 
AvroSerializer<Book>(Book.class);
-                       SerializerTestInstance<Book> test = new 
SerializerTestInstance<Book>(serializer, Book.class, -1, b);
-                       test.testAll();
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testSerialization() {
-               try {
-                       List<String> titles = new ArrayList<String>();
-
-                       List<Book> books = new ArrayList<Book>();
-                       books.add(new Book(123, "This is a test book", 1));
-                       books.add(new Book(24234234, "This is a test book", 1));
-                       books.add(new Book(1234324, "This is a test book", 3));
-
-                       BookAuthor a = new BookAuthor(1, titles, "Test Author");
-                       a.books = books;
-                       a.bookType = BookAuthor.BookType.journal;
-                       
-                       AvroSerializer<BookAuthor> serializer = new 
AvroSerializer<BookAuthor>(BookAuthor.class);
-                       
-                       SerializerTestInstance<BookAuthor> test = new 
SerializerTestInstance<BookAuthor>(serializer, BookAuthor.class, -1, a);
-                       test.testAll();
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       public static class Book {
-
-               long bookId;
-               @Nullable
-               String title;
-               long authorId;
-
-               public Book() {}
-
-               public Book(long bookId, String title, long authorId) {
-                       this.bookId = bookId;
-                       this.title = title;
-                       this.authorId = authorId;
-               }
-
-               @Override
-               public int hashCode() {
-                       final int prime = 31;
-                       int result = 1;
-                       result = prime * result + (int) (authorId ^ (authorId 
>>> 32));
-                       result = prime * result + (int) (bookId ^ (bookId >>> 
32));
-                       result = prime * result + ((title == null) ? 0 : 
title.hashCode());
-                       return result;
-               }
-
-               @Override
-               public boolean equals(Object obj) {
-                       if (this == obj)
-                               return true;
-                       if (obj == null)
-                               return false;
-                       if (getClass() != obj.getClass())
-                               return false;
-                       Book other = (Book) obj;
-                       if (authorId != other.authorId)
-                               return false;
-                       if (bookId != other.bookId)
-                               return false;
-                       if (title == null) {
-                               if (other.title != null)
-                                       return false;
-                       } else if (!title.equals(other.title))
-                               return false;
-                       return true;
-               }
-       }
-
-       public static class BookAuthor {
-
-               enum BookType {
-                       book,
-                       article,
-                       journal
-               }
-
-               long authorId;
-
-               @Nullable
-               List<String> bookTitles;
-
-               @Nullable
-               List<Book> books;
-
-               String authorName;
-
-               BookType bookType;
-
-               public BookAuthor() {}
-
-               public BookAuthor(long authorId, List<String> bookTitles, 
String authorName) {
-                       this.authorId = authorId;
-                       this.bookTitles = bookTitles;
-                       this.authorName = authorName;
-               }
-
-               @Override
-               public int hashCode() {
-                       final int prime = 31;
-                       int result = 1;
-                       result = prime * result + (int) (authorId ^ (authorId 
>>> 32));
-                       result = prime * result + ((authorName == null) ? 0 : 
authorName.hashCode());
-                       result = prime * result + ((bookTitles == null) ? 0 : 
bookTitles.hashCode());
-                       result = prime * result + ((bookType == null) ? 0 : 
bookType.hashCode());
-                       result = prime * result + ((books == null) ? 0 : 
books.hashCode());
-                       return result;
-               }
-
-               @Override
-               public boolean equals(Object obj) {
-                       if (this == obj)
-                               return true;
-                       if (obj == null)
-                               return false;
-                       if (getClass() != obj.getClass())
-                               return false;
-                       BookAuthor other = (BookAuthor) obj;
-                       if (authorId != other.authorId)
-                               return false;
-                       if (authorName == null) {
-                               if (other.authorName != null)
-                                       return false;
-                       } else if (!authorName.equals(other.authorName))
-                               return false;
-                       if (bookTitles == null) {
-                               if (other.bookTitles != null)
-                                       return false;
-                       } else if (!bookTitles.equals(other.bookTitles))
-                               return false;
-                       if (bookType != other.bookType)
-                               return false;
-                       if (books == null) {
-                               if (other.books != null)
-                                       return false;
-                       } else if (!books.equals(other.books))
-                               return false;
-                       return true;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparatorTest.java
deleted file mode 100644
index b317275..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparatorTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.ComparatorTestBase;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.types.StringValue;
-
-public class CopyableValueComparatorTest extends 
ComparatorTestBase<StringValue> {
-
-       StringValue[] data = new StringValue[]{
-               new StringValue(""),
-               new StringValue("Lorem Ipsum Dolor Omit Longer"),
-               new StringValue("aaaa"),
-               new StringValue("abcd"),
-               new StringValue("abce"),
-               new StringValue("abdd"),
-               new StringValue("accd"),
-               new StringValue("bbcd")
-       };
-
-       @Override
-       protected TypeComparator<StringValue> createComparator(boolean 
ascending) {
-               return new CopyableValueComparator<StringValue>(ascending, 
StringValue.class);
-       }
-
-       @Override
-       protected TypeSerializer<StringValue> createSerializer() {
-               return new 
CopyableValueSerializer<StringValue>(StringValue.class);
-       }
-
-       @Override
-       protected StringValue[] getSortedTestData() {
-               return data;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java
deleted file mode 100644
index e4672cc..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-import static org.apache.flink.api.java.typeutils.Either.Left;
-import static org.apache.flink.api.java.typeutils.Either.Right;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.SerializerTestInstance;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.Either;
-import org.apache.flink.api.java.typeutils.EitherTypeInfo;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.junit.Test;
-
-public class EitherSerializerTest {
-
-       @SuppressWarnings("unchecked")
-       @Test
-       public void testStringDoubleEither() {
-
-       Either<String, Double>[] testData = new Either[] {
-                       Left("banana"),
-                       Left(""),
-                       Right(32.0),
-                       Right(Double.MIN_VALUE),
-                       Right(Double.MAX_VALUE)};
-
-       EitherTypeInfo<String, Double> eitherTypeInfo = (EitherTypeInfo<String, 
Double>) new EitherTypeInfo<String, Double>(
-                       BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.DOUBLE_TYPE_INFO);
-       EitherSerializer<String, Double> eitherSerializer =
-                       (EitherSerializer<String, Double>) 
eitherTypeInfo.createSerializer(new ExecutionConfig());
-       SerializerTestInstance<Either<String, Double>> testInstance =
-                       new EitherSerializerTestInstance<Either<String, 
Double>>(eitherSerializer, eitherTypeInfo.getTypeClass(), -1, testData);
-       testInstance.testAll();
-       }
-
-       @SuppressWarnings("unchecked")
-       @Test
-       public void testEitherWithTuple() {
-
-       Either<Tuple2<Long, Long>, Double>[] testData = new Either[] {
-                       Either.Left(new Tuple2<>(2l, 9l)),
-                       new Left<>(new Tuple2<>(Long.MIN_VALUE, 
Long.MAX_VALUE)),
-                       new Right<>(32.0),
-                       Right(Double.MIN_VALUE),
-                       Right(Double.MAX_VALUE)};
-
-       EitherTypeInfo<Tuple2<Long, Long>, Double> eitherTypeInfo = 
(EitherTypeInfo<Tuple2<Long, Long>, Double>)
-                       new EitherTypeInfo<Tuple2<Long, Long>, Double>(
-                       new TupleTypeInfo<Tuple2<Long, 
Long>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO),
-                       BasicTypeInfo.DOUBLE_TYPE_INFO);
-       EitherSerializer<Tuple2<Long, Long>, Double> eitherSerializer =
-                       (EitherSerializer<Tuple2<Long, Long>, Double>) 
eitherTypeInfo.createSerializer(new ExecutionConfig());
-       SerializerTestInstance<Either<Tuple2<Long, Long>, Double>> testInstance 
=
-                       new EitherSerializerTestInstance<Either<Tuple2<Long, 
Long>, Double>>(
-                                       eitherSerializer, 
eitherTypeInfo.getTypeClass(), -1, testData);
-       testInstance.testAll();
-       }
-
-       /**
-        * {@link 
org.apache.flink.api.common.typeutils.SerializerTestBase#testInstantiate()}
-        * checks that the type of the created instance is the same as the type 
class parameter.
-        * Since we arbitrarily create always create a Left instance we 
override this test.
-        */
-       private class EitherSerializerTestInstance<T> extends 
SerializerTestInstance<T> {
-
-               public EitherSerializerTestInstance(TypeSerializer<T> 
serializer,
-                               Class<T> typeClass, int length, T[] testData) {
-                       super(serializer, typeClass, length, testData);
-               }
-
-               @Override
-               @Test
-               public void testInstantiate() {
-                       try {
-                               TypeSerializer<T> serializer = getSerializer();
-
-                               T instance = serializer.createInstance();
-                               assertNotNull("The created instance must not be 
null.", instance);
-                               
-                               Class<T> type = getTypeClass();
-                               assertNotNull("The test is corrupt: type class 
is null.", type);
-                       }
-                       catch (Exception e) {
-                               System.err.println(e.getMessage());
-                               e.printStackTrace();
-                               fail("Exception in test: " + e.getMessage());
-                       }
-               }
-               
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericPairComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericPairComparatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericPairComparatorTest.java
deleted file mode 100644
index 06330d3..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericPairComparatorTest.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.GenericPairComparator;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.DoubleComparator;
-import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
-import org.apache.flink.api.common.typeutils.base.IntComparator;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import 
org.apache.flink.api.java.typeutils.runtime.tuple.base.TuplePairComparatorTestBase;
-
-public class GenericPairComparatorTest extends 
TuplePairComparatorTestBase<Tuple3<Integer, String, Double>, Tuple4<Integer, 
Float, Long, Double>> {
-
-       @SuppressWarnings("unchecked")
-       private Tuple3<Integer, String, Double>[] dataISD = new Tuple3[]{
-               new Tuple3<Integer, String, Double>(4, "hello", 20.0),
-               new Tuple3<Integer, String, Double>(4, "world", 23.2),
-               new Tuple3<Integer, String, Double>(5, "hello", 18.0),
-               new Tuple3<Integer, String, Double>(5, "world", 19.2),
-               new Tuple3<Integer, String, Double>(6, "hello", 16.0),
-               new Tuple3<Integer, String, Double>(6, "world", 17.2),
-               new Tuple3<Integer, String, Double>(7, "hello", 14.0),
-               new Tuple3<Integer, String, Double>(7, "world", 15.2)
-       };
-
-       @SuppressWarnings("unchecked")
-       private Tuple4<Integer, Float, Long, Double>[] dataIDL = new Tuple4[]{
-               new Tuple4<Integer, Float, Long, Double>(4, 0.11f, 14L, 20.0),
-               new Tuple4<Integer, Float, Long, Double>(4, 0.221f, 15L, 23.2),
-               new Tuple4<Integer, Float, Long, Double>(5, 0.33f, 15L, 18.0),
-               new Tuple4<Integer, Float, Long, Double>(5, 0.44f, 20L, 19.2),
-               new Tuple4<Integer, Float, Long, Double>(6, 0.55f, 20L, 16.0),
-               new Tuple4<Integer, Float, Long, Double>(6, 0.66f, 29L, 17.2),
-               new Tuple4<Integer, Float, Long, Double>(7, 0.77f, 29L, 14.0),
-               new Tuple4<Integer, Float, Long, Double>(7, 0.88f, 34L, 15.2)
-       };
-
-       @SuppressWarnings("rawtypes")
-       @Override
-       protected GenericPairComparator<Tuple3<Integer, String, Double>, 
Tuple4<Integer, Float, Long, Double>> createComparator(boolean ascending) {
-               int[] fields1 = new int[]{0, 2};
-               int[] fields2 = new int[]{0, 3};
-               TypeComparator[] comps1 = new TypeComparator[]{
-                               new IntComparator(ascending),
-                               new DoubleComparator(ascending)
-               };
-               TypeComparator[] comps2 = new TypeComparator[]{
-                               new IntComparator(ascending),
-                               new DoubleComparator(ascending)
-               };
-               TypeSerializer[] sers1 = new TypeSerializer[]{
-                               IntSerializer.INSTANCE,
-                               DoubleSerializer.INSTANCE
-               };
-               TypeSerializer[] sers2= new TypeSerializer[]{
-                               IntSerializer.INSTANCE,
-                               DoubleSerializer.INSTANCE
-               };
-               TypeComparator<Tuple3<Integer, String, Double>> comp1 = new 
TupleComparator<Tuple3<Integer, String, Double>>(fields1, comps1, sers1);
-               TypeComparator<Tuple4<Integer, Float, Long, Double>> comp2 = 
new TupleComparator<Tuple4<Integer, Float, Long, Double>>(fields2, comps2, 
sers2);
-               return new GenericPairComparator<Tuple3<Integer, String, 
Double>, Tuple4<Integer, Float, Long, Double>>(comp1, comp2);
-       }
-
-       @Override
-       protected Tuple2<Tuple3<Integer, String, Double>[], Tuple4<Integer, 
Float, Long, Double>[]> getSortedTestData() {
-               return new Tuple2<Tuple3<Integer, String, Double>[], 
Tuple4<Integer, Float, Long, Double>[]>(dataISD, dataIDL);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/MultidimensionalArraySerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/MultidimensionalArraySerializerTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/MultidimensionalArraySerializerTest.java
deleted file mode 100644
index 1c97816..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/MultidimensionalArraySerializerTest.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.SerializerTestInstance;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.junit.Test;
-
-
-/**
- * A serialization test for multidimensional arrays.
- */
-public class MultidimensionalArraySerializerTest {
-
-       @Test
-       public void testStringArray() {
-               String[][] array = new 
String[][]{{null,"b"},{"c","d"},{"e","f"},{"g","h"},null};
-               TypeInformation<String[][]> ti = 
TypeExtractor.getForClass(String[][].class);
-
-               SerializerTestInstance<String[][]> testInstance = new 
SerializerTestInstance<String[][]>(ti.createSerializer(new ExecutionConfig()), 
String[][].class, -1, array);
-               testInstance.testAll();
-       }
-
-       @Test
-       public void testPrimitiveArray() {
-               int[][] array = new 
int[][]{{12,1},{48,42},{23,80},{484,849},{987,4}};
-               TypeInformation<int[][]> ti = 
TypeExtractor.getForClass(int[][].class);
-
-               SerializerTestInstance<int[][]> testInstance = new 
SerializerTestInstance<int[][]>(ti.createSerializer(new ExecutionConfig()), 
int[][].class, -1, array);
-               testInstance.testAll();
-       }
-
-       public static class MyPojo {
-               public String field1;
-               public int field2;
-
-               public MyPojo(String field1, int field2) {
-                       this.field1 = field1;
-                       this.field2 = field2;
-               }
-
-               @Override
-               public boolean equals(Object obj) {
-                       if (!(obj instanceof MyPojo)) {
-                               return false;
-                       }
-                       MyPojo other = (MyPojo) obj;
-                       return ((field1 == null && other.field1 == null) || 
(field1 != null && field1.equals(other.field1)))
-                                       && field2 == other.field2;
-               }
-       }
-
-       @Test
-       public void testObjectArrays() {
-               Integer[][] array = new Integer[][]{{0,1}, null, {null, 42}};
-               TypeInformation<Integer[][]> ti = 
TypeExtractor.getForClass(Integer[][].class);
-
-               SerializerTestInstance<Integer[][]> testInstance = new 
SerializerTestInstance<Integer[][]>(ti.createSerializer(new ExecutionConfig()), 
Integer[][].class, -1, array);
-               testInstance.testAll();
-
-               MyPojo[][] array2 = new MyPojo[][]{{new MyPojo(null, 42), new 
MyPojo("test2", -1)}, {null, null}, null};
-               TypeInformation<MyPojo[][]> ti2 = 
TypeExtractor.getForClass(MyPojo[][].class);
-
-               SerializerTestInstance<MyPojo[][]> testInstance2 = new 
SerializerTestInstance<MyPojo[][]>(ti2.createSerializer(new ExecutionConfig()), 
MyPojo[][].class, -1, array2);
-               testInstance2.testAll();
-       }
-
-       public static class MyGenericPojo<T> {
-               public T[][] field;
-
-               public MyGenericPojo() {
-                       // nothing to do
-               }
-
-               public MyGenericPojo(T[][] field) {
-                       this.field = field;
-               }
-
-               @Override
-               public boolean equals(Object obj) {
-                       if (!(obj instanceof MyGenericPojo)) {
-                               return false;
-                       }
-                       MyGenericPojo<?> other = (MyGenericPojo<?>) obj;
-                       return (field == null && other.field == null) || (field 
!= null && field.length == other.field.length);
-               }
-       }
-
-       @SuppressWarnings({ "unchecked", "rawtypes" })
-       @Test
-       public void testGenericObjectArrays() {
-               MyGenericPojo<String>[][] array = (MyGenericPojo<String>[][]) 
new MyGenericPojo[][]{
-                       { new MyGenericPojo<String>(new String[][]{{"a", 
"b"},{"c", "d"}}), null}
-               };
-               TypeInformation ti = 
TypeInfoParser.parse("org.apache.flink.api.java.typeutils.runtime.MultidimensionalArraySerializerTest$MyGenericPojo<field=String[][]>[][]");
-
-               SerializerTestInstance testInstance = new 
SerializerTestInstance(ti.createSerializer(new ExecutionConfig()), 
MyGenericPojo[][].class, -1, (Object) array);
-               testInstance.testAll();
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java
deleted file mode 100644
index 1baf443..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.util.Arrays;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.ComparatorTestBase;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.junit.Assert;
-
-
-public class PojoComparatorTest extends 
ComparatorTestBase<PojoContainingTuple> {
-       TypeInformation<PojoContainingTuple> type = 
TypeExtractor.getForClass(PojoContainingTuple.class);
-       
-       PojoContainingTuple[] data = new PojoContainingTuple[]{
-               new PojoContainingTuple(1, 1L, 1L),
-               new PojoContainingTuple(2, 2L, 2L),
-               new PojoContainingTuple(8519, 85190L, 85190L),
-               new PojoContainingTuple(8520, 85191L, 85191L),
-       };
-
-       @Override
-       protected TypeComparator<PojoContainingTuple> createComparator(boolean 
ascending) {
-               Assert.assertTrue(type instanceof CompositeType);
-               CompositeType<PojoContainingTuple> cType = 
(CompositeType<PojoContainingTuple>) type;
-               ExpressionKeys<PojoContainingTuple> keys = new 
ExpressionKeys<PojoContainingTuple>(new String[] {"theTuple.*"}, cType);
-               boolean[] orders = new boolean[keys.getNumberOfKeyFields()];
-               Arrays.fill(orders, ascending);
-               return 
cType.createComparator(keys.computeLogicalKeyPositions(), orders, 0, new 
ExecutionConfig());
-       }
-
-       @Override
-       protected TypeSerializer<PojoContainingTuple> createSerializer() {
-               return type.createSerializer(new ExecutionConfig());
-       }
-
-       @Override
-       protected PojoContainingTuple[] getSortedTestData() {
-               return data;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoContainingTuple.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoContainingTuple.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoContainingTuple.java
deleted file mode 100644
index ca17ee0..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoContainingTuple.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-
-/**
- * This file belongs to the PojoComparatorTest test
- *
- */
-public class PojoContainingTuple {
-       public int someInt;
-       public String someString = "abc";
-       public Tuple2<Long, Long> theTuple;
-       public PojoContainingTuple() {}
-       public PojoContainingTuple(int i, long l1, long l2) {
-               someInt = i;
-               theTuple = new Tuple2<Long, Long>(l1, l2);
-       }
-       
-       @Override
-       public boolean equals(Object obj) {
-               if(obj instanceof PojoContainingTuple) {
-                       PojoContainingTuple other = (PojoContainingTuple) obj;
-                       return someInt == other.someInt && 
theTuple.equals(other.theTuple);
-               }
-               return false;
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoGenericTypeSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoGenericTypeSerializerTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoGenericTypeSerializerTest.java
deleted file mode 100644
index d405412..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoGenericTypeSerializerTest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-public class PojoGenericTypeSerializerTest extends 
AbstractGenericTypeSerializerTest {
-       
-       @Override
-       protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
-               TypeInformation<T> typeInfo = TypeExtractor.getForClass(type);
-               return typeInfo.createSerializer(new ExecutionConfig());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
deleted file mode 100644
index b1467b9..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Random;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.SerializerTestBase;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
-import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
-import org.apache.flink.api.java.operators.Keys.IncompatibleKeysException;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.PojoTypeInfo;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.google.common.base.Objects;
-
-/**
- * A test for the {@link PojoSerializer}.
- */
-public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.TestUserClass> {
-       private TypeInformation<TestUserClass> type = 
TypeExtractor.getForClass(TestUserClass.class);
-
-       @Override
-       protected TypeSerializer<TestUserClass> createSerializer() {
-               TypeSerializer<TestUserClass> serializer = 
type.createSerializer(new ExecutionConfig());
-               assert(serializer instanceof PojoSerializer);
-               return serializer;
-       }
-
-       @Override
-       protected int getLength() {
-               return -1;
-       }
-
-       @Override
-       protected Class<TestUserClass> getTypeClass() {
-               return TestUserClass.class;
-       }
-
-       @Override
-       protected TestUserClass[] getTestData() {
-               Random rnd = new Random(874597969123412341L);
-
-               return new TestUserClass[]{
-                               new TestUserClass(rnd.nextInt(), "foo", 
rnd.nextDouble(), new int[]{1, 2, 3}, new Date(),
-                                               new 
NestedTestUserClass(rnd.nextInt(), "foo@boo", rnd.nextDouble(), new int[]{10, 
11, 12})),
-                               new TestUserClass(rnd.nextInt(), "bar", 
rnd.nextDouble(), new int[]{4, 5, 6}, null,
-                                               new 
NestedTestUserClass(rnd.nextInt(), "bar@bas", rnd.nextDouble(), new int[]{20, 
21, 22})),
-                               new TestUserClass(rnd.nextInt(), null, 
rnd.nextDouble(), null, null, null),
-                               new TestUserClass(rnd.nextInt(), "bar", 
rnd.nextDouble(), new int[]{4, 5, 6}, new Date(),
-                                               new 
NestedTestUserClass(rnd.nextInt(), "bar@bas", rnd.nextDouble(), new int[]{20, 
21, 22}))
-               };
-
-       }
-
-       // User code class for testing the serializer
-       public static class TestUserClass {
-               public int dumm1;
-               public String dumm2;
-               public double dumm3;
-               public int[] dumm4;
-               public Date dumm5;
-
-               public NestedTestUserClass nestedClass;
-
-               public TestUserClass() {
-               }
-
-               public TestUserClass(int dumm1, String dumm2, double dumm3, 
int[] dumm4, Date dumm5, NestedTestUserClass nestedClass) {
-                       this.dumm1 = dumm1;
-                       this.dumm2 = dumm2;
-                       this.dumm3 = dumm3;
-                       this.dumm4 = dumm4;
-                       this.dumm5 = dumm5;
-                       this.nestedClass = nestedClass;
-               }
-
-               @Override
-               public int hashCode() {
-                       return Objects.hashCode(dumm1, dumm2, dumm3, dumm4, 
nestedClass);
-               }
-
-               @Override
-               public boolean equals(Object other) {
-                       if (!(other instanceof TestUserClass)) {
-                               return false;
-                       }
-                       TestUserClass otherTUC = (TestUserClass) other;
-                       if (dumm1 != otherTUC.dumm1) {
-                               return false;
-                       }
-                       if ((dumm2 == null && otherTUC.dumm2 != null)
-                                       || (dumm2 != null && 
!dumm2.equals(otherTUC.dumm2))) {
-                               return false;
-                       }
-                       if (dumm3 != otherTUC.dumm3) {
-                               return false;
-                       }
-                       if ((dumm4 != null && otherTUC.dumm4 == null)
-                                       || (dumm4 == null && otherTUC.dumm4 != 
null)
-                                       || (dumm4 != null && otherTUC.dumm4 != 
null && dumm4.length != otherTUC.dumm4.length)) {
-                               return false;
-                       }
-                       if (dumm4 != null && otherTUC.dumm4 != null) {
-                               for (int i = 0; i < dumm4.length; i++) {
-                                       if (dumm4[i] != otherTUC.dumm4[i]) {
-                                               return false;
-                                       }
-                               }
-                       }
-                       
-                       if ((nestedClass == null && otherTUC.nestedClass != 
null)
-                                       || (nestedClass != null && 
!nestedClass.equals(otherTUC.nestedClass))) {
-                               return false;
-                       }
-                       return true;
-               }
-       }
-
-       public static class NestedTestUserClass {
-               public int dumm1;
-               public String dumm2;
-               public double dumm3;
-               public int[] dumm4;
-
-               public NestedTestUserClass() {
-               }
-
-               public NestedTestUserClass(int dumm1, String dumm2, double 
dumm3, int[] dumm4) {
-                       this.dumm1 = dumm1;
-                       this.dumm2 = dumm2;
-                       this.dumm3 = dumm3;
-                       this.dumm4 = dumm4;
-               }
-
-               @Override
-               public int hashCode() {
-                       return Objects.hashCode(dumm1, dumm2, dumm3, dumm4);
-               }
-
-               @Override
-               public boolean equals(Object other) {
-                       if (!(other instanceof NestedTestUserClass)) {
-                               return false;
-                       }
-                       NestedTestUserClass otherTUC = (NestedTestUserClass) 
other;
-                       if (dumm1 != otherTUC.dumm1) {
-                               return false;
-                       }
-                       if (!dumm2.equals(otherTUC.dumm2)) {
-                               return false;
-                       }
-                       if (dumm3 != otherTUC.dumm3) {
-                               return false;
-                       }
-                       if (dumm4.length != otherTUC.dumm4.length) {
-                               return false;
-                       }
-                       for (int i = 0; i < dumm4.length; i++) {
-                               if (dumm4[i] != otherTUC.dumm4[i]) {
-                                       return false;
-                               }
-                       }
-                       return true;
-               }
-       }
-       
-       /**
-        * This tests if the hashes returned by the pojo and tuple comparators 
are the same
-        */
-       @SuppressWarnings({ "rawtypes", "unchecked" })
-       @Test
-       public void testTuplePojoTestEquality() {
-               
-               // test with a simple, string-key first.
-               PojoTypeInfo<TestUserClass> pType = 
(PojoTypeInfo<TestUserClass>) type;
-               List<FlatFieldDescriptor> result = new 
ArrayList<FlatFieldDescriptor>();
-               pType.getFlatFields("nestedClass.dumm2", 0, result);
-               int[] fields = new int[1]; // see below
-               fields[0] = result.get(0).getPosition();
-               TypeComparator<TestUserClass> pojoComp = 
pType.createComparator( fields, new boolean[]{true}, 0, new ExecutionConfig());
-               
-               TestUserClass pojoTestRecord = new TestUserClass(0, "abc", 3d, 
new int[] {1,2,3}, new Date(), new NestedTestUserClass(1, "haha", 4d, new int[] 
{5,4,3}));
-               int pHash = pojoComp.hash(pojoTestRecord);
-               
-               Tuple1<String> tupleTest = new Tuple1<String>("haha");
-               TupleTypeInfo<Tuple1<String>> tType = 
(TupleTypeInfo<Tuple1<String>>)TypeExtractor.getForObject(tupleTest);
-               TypeComparator<Tuple1<String>> tupleComp = 
tType.createComparator(new int[] {0}, new boolean[] {true}, 0, new 
ExecutionConfig());
-               
-               int tHash = tupleComp.hash(tupleTest);
-               
-               Assert.assertTrue("The hashing for tuples and pojos must be the 
same, so that they are mixable", pHash == tHash);
-               
-               Tuple3<Integer, String, Double> multiTupleTest = new 
Tuple3<Integer, String, Double>(1, "haha", 4d); // its important here to use 
the same values.
-               TupleTypeInfo<Tuple3<Integer, String, Double>> multiTupleType = 
(TupleTypeInfo<Tuple3<Integer, String, 
Double>>)TypeExtractor.getForObject(multiTupleTest);
-               
-               ExpressionKeys fieldKey = new ExpressionKeys(new int[]{1,0,2}, 
multiTupleType);
-               ExpressionKeys expressKey = new ExpressionKeys(new String[] 
{"nestedClass.dumm2", "nestedClass.dumm1", "nestedClass.dumm3"}, pType);
-               try {
-                       Assert.assertTrue("Expecting the keys to be 
compatible", fieldKey.areCompatible(expressKey));
-               } catch (IncompatibleKeysException e) {
-                       e.printStackTrace();
-                       Assert.fail("Keys must be compatible: "+e.getMessage());
-               }
-               TypeComparator<TestUserClass> multiPojoComp = 
pType.createComparator( expressKey.computeLogicalKeyPositions(), new 
boolean[]{true, true, true}, 0, new ExecutionConfig());
-               int multiPojoHash = multiPojoComp.hash(pojoTestRecord);
-               
-               
-               // pojo order is: dumm2 (str), dumm1 (int), dumm3 (double).
-               TypeComparator<Tuple3<Integer, String, Double>> multiTupleComp 
= multiTupleType.createComparator(fieldKey.computeLogicalKeyPositions(), new 
boolean[] {true, true,true}, 0, new ExecutionConfig());
-               int multiTupleHash = multiTupleComp.hash(multiTupleTest);
-               
-               Assert.assertTrue("The hashing for tuples and pojos must be the 
same, so that they are mixable. Also for those with multiple key fields", 
multiPojoHash == multiTupleHash);
-               
-       }
-}      

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassComparatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassComparatorTest.java
deleted file mode 100644
index 3a03683..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassComparatorTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.ComparatorTestBase;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.junit.Assert;
-
-import java.util.Arrays;
-
-
-public class PojoSubclassComparatorTest extends 
ComparatorTestBase<PojoContainingTuple> {
-       TypeInformation<PojoContainingTuple> type = 
TypeExtractor.getForClass(PojoContainingTuple.class);
-       
-       PojoContainingTuple[] data = new PojoContainingTuple[]{
-               new Subclass(1, 1L, 1L, 17L),
-               new Subclass(2, 2L, 2L, 42L),
-               new Subclass(8519, 85190L, 85190L, 117L),
-               new Subclass(8520, 85191L, 85191L, 93L),
-       };
-
-       @Override
-       protected TypeComparator<PojoContainingTuple> createComparator(boolean 
ascending) {
-               Assert.assertTrue(type instanceof CompositeType);
-               CompositeType<PojoContainingTuple> cType = 
(CompositeType<PojoContainingTuple>) type;
-               ExpressionKeys<PojoContainingTuple> keys = new 
ExpressionKeys<PojoContainingTuple>(new String[] {"theTuple.*"}, cType);
-               boolean[] orders = new boolean[keys.getNumberOfKeyFields()];
-               Arrays.fill(orders, ascending);
-               return 
cType.createComparator(keys.computeLogicalKeyPositions(), orders, 0, new 
ExecutionConfig());
-       }
-
-       @Override
-       protected TypeSerializer<PojoContainingTuple> createSerializer() {
-               return type.createSerializer(new ExecutionConfig());
-       }
-
-       @Override
-       protected PojoContainingTuple[] getSortedTestData() {
-               return data;
-       }
-
-       public static class Subclass extends PojoContainingTuple {
-
-               public long additionalField;
-
-               public Subclass() {
-               }
-
-               public Subclass(int i, long l1, long l2, long additionalField) {
-                       super(i, l1, l2);
-                       this.additionalField = additionalField;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java
deleted file mode 100644
index 8c61a19..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import com.google.common.base.Objects;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.SerializerTestBase;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.junit.Test;
-
-import java.util.Random;
-
-/**
- * A test for the {@link PojoSerializer}.
- */
-public class PojoSubclassSerializerTest extends 
SerializerTestBase<PojoSubclassSerializerTest.TestUserClassBase> {
-       private TypeInformation<TestUserClassBase> type = 
TypeExtractor.getForClass(TestUserClassBase.class);
-
-       @Override
-       protected TypeSerializer<TestUserClassBase> createSerializer() {
-               // only register one of the three child classes, the third 
child class is NO POJO
-               ExecutionConfig conf = new ExecutionConfig();
-               conf.registerPojoType(TestUserClass1.class);
-               TypeSerializer<TestUserClassBase> serializer = 
type.createSerializer(conf);
-               assert(serializer instanceof PojoSerializer);
-               return serializer;
-       }
-
-       @Override
-       protected int getLength() {
-               return -1;
-       }
-
-       @Override
-       protected Class<TestUserClassBase> getTypeClass() {
-               return TestUserClassBase.class;
-       }
-
-       @Override
-       protected TestUserClassBase[] getTestData() {
-               Random rnd = new Random(874597969123412341L);
-
-               return new TestUserClassBase[]{
-                               new TestUserClass1(rnd.nextInt(), "foo", 
rnd.nextLong()),
-                               new TestUserClass2(rnd.nextInt(), "bar", 
rnd.nextFloat()),
-                               new TestUserClass3(rnd.nextInt(), "bar", 
rnd.nextFloat())
-               };
-
-       }
-
-       @Override
-       @Test
-       public void testInstantiate() {
-               // don't do anything, since the PojoSerializer with subclass 
will return null
-       }
-
-       // User code class for testing the serializer
-       public static abstract class TestUserClassBase {
-               public int dumm1;
-               public String dumm2;
-
-
-               public TestUserClassBase() {
-               }
-
-               public TestUserClassBase(int dumm1, String dumm2) {
-                       this.dumm1 = dumm1;
-                       this.dumm2 = dumm2;
-               }
-
-               @Override
-               public int hashCode() {
-                       return Objects.hashCode(dumm1, dumm2);
-               }
-
-               @Override
-               public boolean equals(Object other) {
-                       if (!(other instanceof TestUserClassBase)) {
-                               return false;
-                       }
-                       TestUserClassBase otherTUC = (TestUserClassBase) other;
-                       if (dumm1 != otherTUC.dumm1) {
-                               return false;
-                       }
-                       if (!dumm2.equals(otherTUC.dumm2)) {
-                               return false;
-                       }
-                       return true;
-               }
-       }
-
-       public static class TestUserClass1 extends TestUserClassBase {
-               public long dumm3;
-
-               public TestUserClass1() {
-               }
-
-               public TestUserClass1(int dumm1, String dumm2, long dumm3) {
-                       super(dumm1, dumm2);
-                       this.dumm3 = dumm3;
-               }
-
-               @Override
-               public boolean equals(Object other) {
-                       if (!(other instanceof TestUserClass1)) {
-                               return false;
-                       }
-                       TestUserClass1 otherTUC = (TestUserClass1) other;
-                       if (dumm1 != otherTUC.dumm1) {
-                               return false;
-                       }
-                       if (!dumm2.equals(otherTUC.dumm2)) {
-                               return false;
-                       }
-                       if (dumm3 != otherTUC.dumm3) {
-                               return false;
-                       }
-                       return true;
-               }
-       }
-
-       public static class TestUserClass2 extends TestUserClassBase {
-               public float dumm4;
-
-               public TestUserClass2() {
-               }
-
-               public TestUserClass2(int dumm1, String dumm2, float dumm4) {
-                       super(dumm1, dumm2);
-                       this.dumm4 = dumm4;
-               }
-
-               @Override
-               public boolean equals(Object other) {
-                       if (!(other instanceof TestUserClass2)) {
-                               return false;
-                       }
-                       TestUserClass2 otherTUC = (TestUserClass2) other;
-                       if (dumm1 != otherTUC.dumm1) {
-                               return false;
-                       }
-                       if (!dumm2.equals(otherTUC.dumm2)) {
-                               return false;
-                       }
-                       if (dumm4 != otherTUC.dumm4) {
-                               return false;
-                       }
-                       return true;
-               }
-       }
-
-       public static class TestUserClass3 extends TestUserClassBase {
-               public float dumm4;
-
-               public TestUserClass3(int dumm1, String dumm2, float dumm4) {
-                       super(dumm1, dumm2);
-                       this.dumm4 = dumm4;
-               }
-
-               @Override
-               public boolean equals(Object other) {
-                       if (!(other instanceof TestUserClass3)) {
-                               return false;
-                       }
-                       TestUserClass3 otherTUC = (TestUserClass3) other;
-                       if (dumm1 != otherTUC.dumm1) {
-                               return false;
-                       }
-                       if (!dumm2.equals(otherTUC.dumm2)) {
-                               return false;
-                       }
-                       if (dumm4 != otherTUC.dumm4) {
-                               return false;
-                       }
-                       return true;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
deleted file mode 100644
index 7c608f2..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/StringArrayWritable.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-
-public class StringArrayWritable implements Writable, 
Comparable<StringArrayWritable> {
-       
-       private String[] array = new String[0];
-       
-       public StringArrayWritable() {
-               super();
-       }
-       
-       public StringArrayWritable(String[] array) {
-               this.array = array;
-       }
-       
-       @Override
-       public void write(DataOutput out) throws IOException {
-               out.writeInt(this.array.length);
-               
-               for(String str : this.array) {
-                       byte[] b = str.getBytes();
-                       out.writeInt(b.length);
-                       out.write(b);
-               }
-       }
-       
-       @Override
-       public void readFields(DataInput in) throws IOException {
-               this.array = new String[in.readInt()];
-               
-               for(int i = 0; i < this.array.length; i++) {
-                       byte[] b = new byte[in.readInt()];
-                       in.readFully(b);
-                       this.array[i] = new String(b);
-               }
-       }
-       
-       @Override
-       public int compareTo(StringArrayWritable o) {
-               if(this.array.length != o.array.length) {
-                       return this.array.length - o.array.length;
-               }
-               
-               for(int i = 0; i < this.array.length; i++) {
-                       int comp = this.array[i].compareTo(o.array[i]);
-                       if(comp != 0) {
-                               return comp;
-                       }
-               }
-               return 0;
-       }
-       
-       @Override
-       public boolean equals(Object obj) {
-               if(!(obj instanceof StringArrayWritable)) {
-                       return false;
-               }
-               return this.compareTo((StringArrayWritable) obj) == 0;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/SubclassFromInterfaceSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/SubclassFromInterfaceSerializerTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/SubclassFromInterfaceSerializerTest.java
deleted file mode 100644
index b797090..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/SubclassFromInterfaceSerializerTest.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import com.google.common.base.Objects;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.SerializerTestBase;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
-import org.junit.Test;
-
-import java.util.Random;
-
-/**
- * Testing the serialization of classes which are subclasses of a class that 
implements an interface.
- */
-public class SubclassFromInterfaceSerializerTest extends 
SerializerTestBase<SubclassFromInterfaceSerializerTest.TestUserInterface> {
-       private TypeInformation<TestUserInterface> type = 
TypeExtractor.getForClass(TestUserInterface.class);
-
-       @Override
-       protected TypeSerializer<TestUserInterface> createSerializer() {
-               // only register one of the two child classes
-               ExecutionConfig conf = new ExecutionConfig();
-               conf.registerPojoType(TestUserClass2.class);
-               TypeSerializer<TestUserInterface> serializer = 
type.createSerializer(conf);
-               assert(serializer instanceof KryoSerializer);
-               return serializer;
-       }
-
-       @Override
-       protected int getLength() {
-               return -1;
-       }
-
-       @Override
-       protected Class<TestUserInterface> getTypeClass() {
-               return TestUserInterface.class;
-       }
-
-       @Override
-       protected TestUserInterface[] getTestData() {
-               Random rnd = new Random(874597969123412341L);
-
-               return new TestUserInterface[]{
-                               new TestUserClass1(rnd.nextInt(), "foo", 
rnd.nextLong()),
-                               new TestUserClass2(rnd.nextInt(), "bar", 
rnd.nextFloat())
-               };
-
-       }
-
-       @Override
-       @Test
-       public void testInstantiate() {
-               // don't do anything, since the PojoSerializer with subclass 
will return null
-       }
-
-       public interface TestUserInterface {}
-
-       // User code class for testing the serializer
-       public static class TestUserClassBase implements TestUserInterface {
-               public int dumm1;
-               public String dumm2;
-
-
-               public TestUserClassBase() {
-               }
-
-               public TestUserClassBase(int dumm1, String dumm2) {
-                       this.dumm1 = dumm1;
-                       this.dumm2 = dumm2;
-               }
-
-               @Override
-               public int hashCode() {
-                       return Objects.hashCode(dumm1, dumm2);
-               }
-
-               @Override
-               public boolean equals(Object other) {
-                       if (!(other instanceof TestUserClassBase)) {
-                               return false;
-                       }
-                       TestUserClassBase otherTUC = (TestUserClassBase) other;
-                       if (dumm1 != otherTUC.dumm1) {
-                               return false;
-                       }
-                       if (!dumm2.equals(otherTUC.dumm2)) {
-                               return false;
-                       }
-                       return true;
-               }
-       }
-
-       public static class TestUserClass1 extends TestUserClassBase {
-               public long dumm3;
-
-               public TestUserClass1() {
-               }
-
-               public TestUserClass1(int dumm1, String dumm2, long dumm3) {
-                       super(dumm1, dumm2);
-                       this.dumm3 = dumm3;
-               }
-
-               @Override
-               public boolean equals(Object other) {
-                       if (!(other instanceof TestUserClass1)) {
-                               return false;
-                       }
-                       TestUserClass1 otherTUC = (TestUserClass1) other;
-                       if (dumm1 != otherTUC.dumm1) {
-                               return false;
-                       }
-                       if (!dumm2.equals(otherTUC.dumm2)) {
-                               return false;
-                       }
-                       if (dumm3 != otherTUC.dumm3) {
-                               return false;
-                       }
-                       return true;
-               }
-       }
-
-       public static class TestUserClass2 extends TestUserClassBase {
-               public float dumm4;
-
-               public TestUserClass2() {
-               }
-
-               public TestUserClass2(int dumm1, String dumm2, float dumm4) {
-                       super(dumm1, dumm2);
-                       this.dumm4 = dumm4;
-               }
-
-               @Override
-               public boolean equals(Object other) {
-                       if (!(other instanceof TestUserClass2)) {
-                               return false;
-                       }
-                       TestUserClass2 otherTUC = (TestUserClass2) other;
-                       if (dumm1 != otherTUC.dumm1) {
-                               return false;
-                       }
-                       if (!dumm2.equals(otherTUC.dumm2)) {
-                               return false;
-                       }
-                       if (dumm4 != otherTUC.dumm4) {
-                               return false;
-                       }
-                       return true;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java
deleted file mode 100644
index 87be6db..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.UTFDataFormatException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemoryUtils;
-
-public final class TestDataOutputSerializer implements DataOutputView {
-       
-       private byte[] buffer;
-       
-       private int position;
-
-       private ByteBuffer wrapper;
-       
-       private final int maxSize;
-       
-
-       public TestDataOutputSerializer(int startSize) {
-               this(startSize, Integer.MAX_VALUE);
-       }
-       
-       public TestDataOutputSerializer(int startSize, int maxSize) {
-               if (startSize < 1 || startSize > maxSize) {
-                       throw new IllegalArgumentException();
-               }
-
-               this.buffer = new byte[startSize];
-               this.wrapper = ByteBuffer.wrap(buffer);
-               this.maxSize = maxSize;
-       }
-       
-       public ByteBuffer wrapAsByteBuffer() {
-               this.wrapper.position(0);
-               this.wrapper.limit(this.position);
-               return this.wrapper;
-       }
-
-       public byte[] copyByteBuffer() {
-               byte[] target = new byte[position];
-               System.arraycopy(buffer, 0, target, 0, position);
-
-               return target;
-       }
-
-       public void clear() {
-               this.position = 0;
-       }
-
-       public int length() {
-               return this.position;
-       }
-
-       @Override
-       public String toString() {
-               return String.format("[pos=%d cap=%d]", this.position, 
this.buffer.length);
-       }
-
-       // 
----------------------------------------------------------------------------------------
-       //                               Data Output
-       // 
----------------------------------------------------------------------------------------
-       
-       @Override
-       public void write(int b) throws IOException {
-               if (this.position >= this.buffer.length) {
-                       resize(1);
-               }
-               this.buffer[this.position++] = (byte) (b & 0xff);
-       }
-
-       @Override
-       public void write(byte[] b) throws IOException {
-               write(b, 0, b.length);
-       }
-
-       @Override
-       public void write(byte[] b, int off, int len) throws IOException {
-               if (len < 0 || off > b.length - len) {
-                       throw new ArrayIndexOutOfBoundsException();
-               }
-               if (this.position > this.buffer.length - len) {
-                       resize(len);
-               }
-               System.arraycopy(b, off, this.buffer, this.position, len);
-               this.position += len;
-       }
-
-       @Override
-       public void writeBoolean(boolean v) throws IOException {
-               write(v ? 1 : 0);
-       }
-
-       @Override
-       public void writeByte(int v) throws IOException {
-               write(v);
-       }
-
-       @Override
-       public void writeBytes(String s) throws IOException {
-               final int sLen = s.length();
-               if (this.position >= this.buffer.length - sLen) {
-                       resize(sLen);
-               }
-               
-               for (int i = 0; i < sLen; i++) {
-                       writeByte(s.charAt(i));
-               }
-               this.position += sLen;
-       }
-
-       @Override
-       public void writeChar(int v) throws IOException {
-               if (this.position >= this.buffer.length - 1) {
-                       resize(2);
-               }
-               this.buffer[this.position++] = (byte) (v >> 8);
-               this.buffer[this.position++] = (byte) v;
-       }
-
-       @Override
-       public void writeChars(String s) throws IOException {
-               final int sLen = s.length();
-               if (this.position >= this.buffer.length - 2*sLen) {
-                       resize(2*sLen);
-               } 
-               for (int i = 0; i < sLen; i++) {
-                       writeChar(s.charAt(i));
-               }
-       }
-
-       @Override
-       public void writeDouble(double v) throws IOException {
-               writeLong(Double.doubleToLongBits(v));
-       }
-
-       @Override
-       public void writeFloat(float v) throws IOException {
-               writeInt(Float.floatToIntBits(v));
-       }
-
-       @SuppressWarnings("restriction")
-       @Override
-       public void writeInt(int v) throws IOException {
-               if (this.position >= this.buffer.length - 3) {
-                       resize(4);
-               }
-               if (LITTLE_ENDIAN) {
-                       v = Integer.reverseBytes(v);
-               }                       
-               UNSAFE.putInt(this.buffer, BASE_OFFSET + this.position, v);
-               this.position += 4;
-       }
-
-       @SuppressWarnings("restriction")
-       @Override
-       public void writeLong(long v) throws IOException {
-               if (this.position >= this.buffer.length - 7) {
-                       resize(8);
-               }
-               if (LITTLE_ENDIAN) {
-                       v = Long.reverseBytes(v);
-               }
-               UNSAFE.putLong(this.buffer, BASE_OFFSET + this.position, v);
-               this.position += 8;
-       }
-
-       @Override
-       public void writeShort(int v) throws IOException {
-               if (this.position >= this.buffer.length - 1) {
-                       resize(2);
-               }
-               this.buffer[this.position++] = (byte) ((v >>> 8) & 0xff);
-               this.buffer[this.position++] = (byte) ((v >>> 0) & 0xff);
-       }
-
-       @Override
-       public void writeUTF(String str) throws IOException {
-               int strlen = str.length();
-               int utflen = 0;
-               int c;
-
-               /* use charAt instead of copying String to char array */
-               for (int i = 0; i < strlen; i++) {
-                       c = str.charAt(i);
-                       if ((c >= 0x0001) && (c <= 0x007F)) {
-                               utflen++;
-                       } else if (c > 0x07FF) {
-                               utflen += 3;
-                       } else {
-                               utflen += 2;
-                       }
-               }
-
-               if (utflen > 65535) {
-                       throw new UTFDataFormatException("Encoded string is too 
long: " + utflen);
-               }
-               else if (this.position > this.buffer.length - utflen - 2) {
-                       resize(utflen + 2);
-               }
-               
-               byte[] bytearr = this.buffer;
-               int count = this.position;
-
-               bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
-               bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF);
-
-               int i = 0;
-               for (i = 0; i < strlen; i++) {
-                       c = str.charAt(i);
-                       if (!((c >= 0x0001) && (c <= 0x007F))) {
-                               break;
-                       }
-                       bytearr[count++] = (byte) c;
-               }
-
-               for (; i < strlen; i++) {
-                       c = str.charAt(i);
-                       if ((c >= 0x0001) && (c <= 0x007F)) {
-                               bytearr[count++] = (byte) c;
-
-                       } else if (c > 0x07FF) {
-                               bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 
0x0F));
-                               bytearr[count++] = (byte) (0x80 | ((c >> 6) & 
0x3F));
-                               bytearr[count++] = (byte) (0x80 | ((c >> 0) & 
0x3F));
-                       } else {
-                               bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 
0x1F));
-                               bytearr[count++] = (byte) (0x80 | ((c >> 0) & 
0x3F));
-                       }
-               }
-
-               this.position = count;
-       }
-       
-       
-       private void resize(int minCapacityAdd) throws IOException {
-               try {
-                       int newLen = Math.max(this.buffer.length * 2, 
this.buffer.length + minCapacityAdd);
-                       
-                       if (newLen > maxSize) {
-                               
-                               if (this.buffer.length + minCapacityAdd > 
maxSize) {
-                                       throw new EOFException("Exceeded 
maximum capacity");
-                               }
-                               
-                               newLen = maxSize;
-                       }
-                       
-                       final byte[] nb = new byte[newLen];
-                       System.arraycopy(this.buffer, 0, nb, 0, this.position);
-                       this.buffer = nb;
-                       this.wrapper = ByteBuffer.wrap(this.buffer);
-               }
-               catch (NegativeArraySizeException nasex) {
-                       throw new IOException("Serialization failed because the 
record length would exceed 2GB (max addressable array size in Java).");
-               }
-       }
-       
-       @SuppressWarnings("restriction")
-       private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
-       
-       @SuppressWarnings("restriction")
-       private static final long BASE_OFFSET = 
UNSAFE.arrayBaseOffset(byte[].class);
-       
-       private static final boolean LITTLE_ENDIAN = 
(MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
-
-       @Override
-       public void skipBytesToWrite(int numBytes) throws IOException {
-               if(buffer.length - this.position < numBytes){
-                       throw new EOFException("Could not skip " + numBytes + " 
bytes.");
-               }
-
-               this.position += numBytes;
-       }
-
-       @Override
-       public void write(DataInputView source, int numBytes) throws 
IOException {
-               if(buffer.length - this.position < numBytes){
-                       throw new EOFException("Could not write " + numBytes + 
" bytes. Buffer overflow.");
-               }
-
-               source.read(this.buffer, this.position, numBytes);
-               this.position += numBytes;
-       }
-       
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD2Test.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD2Test.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD2Test.java
deleted file mode 100644
index cfc4914..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD2Test.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
-import org.apache.flink.api.common.typeutils.base.IntComparator;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.LongComparator;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.java.tuple.Tuple3;
-
-import 
org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
-
-public class TupleComparatorILD2Test extends 
TupleComparatorTestBase<Tuple3<Integer, Long, Double>> {
-
-       @SuppressWarnings("unchecked")
-       Tuple3<Integer, Long, Double>[] dataISD = new Tuple3[]{
-               new Tuple3<Integer, Long, Double>(4, 14L, 20.0),
-               new Tuple3<Integer, Long, Double>(4, 15L, 23.2),
-               new Tuple3<Integer, Long, Double>(5, 15L, 20.0),
-               new Tuple3<Integer, Long, Double>(5, 20L, 20.0),
-               new Tuple3<Integer, Long, Double>(6, 20L, 23.2),
-               new Tuple3<Integer, Long, Double>(6, 29L, 20.0),
-               new Tuple3<Integer, Long, Double>(7, 29L, 20.0),
-               new Tuple3<Integer, Long, Double>(7, 34L, 23.2)
-       };
-
-       @Override
-       protected TupleComparator<Tuple3<Integer, Long, Double>> 
createComparator(boolean ascending) {
-               return new TupleComparator<Tuple3<Integer, Long, Double>>(
-                               new int[]{0, 1},
-                               new TypeComparator[]{
-                                       new IntComparator(ascending),
-                                       new LongComparator(ascending)
-                               },
-                               new TypeSerializer[]{ IntSerializer.INSTANCE, 
LongSerializer.INSTANCE });
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       protected TupleSerializer<Tuple3<Integer, Long, Double>> 
createSerializer() {
-               return new TupleSerializer<Tuple3<Integer, Long, Double>>(
-                               (Class<Tuple3<Integer, Long, Double>>) 
(Class<?>) Tuple3.class,
-                               new TypeSerializer[]{
-                                       new IntSerializer(),
-                                       new LongSerializer(),
-                                       new DoubleSerializer()});
-       }
-
-       @Override
-       protected Tuple3<Integer, Long, Double>[] getSortedTestData() {
-               return dataISD;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD3Test.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD3Test.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD3Test.java
deleted file mode 100644
index e5a0e6c..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD3Test.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.DoubleComparator;
-import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
-import org.apache.flink.api.common.typeutils.base.IntComparator;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.LongComparator;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.java.tuple.Tuple3;
-
-import 
org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
-
-public class TupleComparatorILD3Test extends 
TupleComparatorTestBase<Tuple3<Integer, Long, Double>> {
-
-       @SuppressWarnings("unchecked")
-       Tuple3<Integer, Long, Double>[] dataISD = new Tuple3[]{
-               new Tuple3<Integer, Long, Double>(4, Long.valueOf(4), 20.0),
-               new Tuple3<Integer, Long, Double>(4, Long.valueOf(4), 23.2),
-               new Tuple3<Integer, Long, Double>(4, Long.valueOf(9), 20.0),
-               new Tuple3<Integer, Long, Double>(5, Long.valueOf(4), 20.0),
-               new Tuple3<Integer, Long, Double>(5, Long.valueOf(4), 23.2),
-               new Tuple3<Integer, Long, Double>(5, Long.valueOf(9), 20.0),
-               new Tuple3<Integer, Long, Double>(6, Long.valueOf(4), 20.0),
-               new Tuple3<Integer, Long, Double>(6, Long.valueOf(4), 23.2)
-       };
-
-       @Override
-       protected TupleComparator<Tuple3<Integer, Long, Double>> 
createComparator(boolean ascending) {
-               return new TupleComparator<Tuple3<Integer, Long, Double>>(
-                               new int[]{0, 1, 2},
-                               new TypeComparator[]{
-                                       new IntComparator(ascending),
-                                       new LongComparator(ascending),
-                                       new DoubleComparator(ascending)
-                               },
-               new TypeSerializer[]{ IntSerializer.INSTANCE, 
LongSerializer.INSTANCE, DoubleSerializer.INSTANCE });
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       protected TupleSerializer<Tuple3<Integer, Long, Double>> 
createSerializer() {
-               return new TupleSerializer<Tuple3<Integer, Long, Double>>(
-                               (Class<Tuple3<Integer, Long, Double>>) 
(Class<?>) Tuple3.class,
-                               new TypeSerializer[]{
-                                       new IntSerializer(),
-                                       new LongSerializer(),
-                                       new DoubleSerializer()});
-       }
-
-       @Override
-       protected Tuple3<Integer, Long, Double>[] getSortedTestData() {
-               return dataISD;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDC3Test.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDC3Test.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDC3Test.java
deleted file mode 100644
index a1e6c40..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDC3Test.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.DoubleComparator;
-import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
-import org.apache.flink.api.common.typeutils.base.IntComparator;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.LongComparator;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.java.tuple.Tuple3;
-
-import 
org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
-
-public class TupleComparatorILDC3Test extends 
TupleComparatorTestBase<Tuple3<Integer, Long, Double>> {
-
-       @SuppressWarnings("unchecked")
-       Tuple3<Integer, Long, Double>[] dataISD = new Tuple3[]{
-               new Tuple3<Integer, Long, Double>(4, Long.valueOf(4), 20.0),
-               new Tuple3<Integer, Long, Double>(5, Long.valueOf(1), 20.0),
-               new Tuple3<Integer, Long, Double>(5, Long.valueOf(2), 20.0),
-               new Tuple3<Integer, Long, Double>(5, Long.valueOf(10), 23.0),
-               new Tuple3<Integer, Long, Double>(5, Long.valueOf(19), 24.0),
-               new Tuple3<Integer, Long, Double>(5, Long.valueOf(20), 24.0),
-               new Tuple3<Integer, Long, Double>(5, Long.valueOf(24), 25.0),
-               new Tuple3<Integer, Long, Double>(5, Long.valueOf(25), 25.0)
-       };
-
-       @Override
-       protected TupleComparator<Tuple3<Integer, Long, Double>> 
createComparator(boolean ascending) {
-               return new TupleComparator<Tuple3<Integer, Long, Double>>(
-                               new int[]{2, 0, 1},
-                               new TypeComparator[]{
-                                       new DoubleComparator(ascending),
-                                       new IntComparator(ascending),
-                                       new LongComparator(ascending)
-                               },
-               new TypeSerializer[]{ IntSerializer.INSTANCE, 
LongSerializer.INSTANCE, DoubleSerializer.INSTANCE });
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       protected TupleSerializer<Tuple3<Integer, Long, Double>> 
createSerializer() {
-               return new TupleSerializer<Tuple3<Integer, Long, Double>>(
-                               (Class<Tuple3<Integer, Long, Double>>) 
(Class<?>) Tuple3.class,
-                               new TypeSerializer[]{
-                                       new IntSerializer(),
-                                       new LongSerializer(),
-                                       new DoubleSerializer()});
-       }
-
-       @Override
-       protected Tuple3<Integer, Long, Double>[] getSortedTestData() {
-               return dataISD;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDX1Test.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDX1Test.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDX1Test.java
deleted file mode 100644
index b5c0c1f..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDX1Test.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.LongComparator;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.java.tuple.Tuple3;
-
-import 
org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
-
-public class TupleComparatorILDX1Test extends 
TupleComparatorTestBase<Tuple3<Integer, Long, Double>> {
-
-       @SuppressWarnings("unchecked")
-       Tuple3<Integer, Long, Double>[] dataISD = new Tuple3[]{
-               new Tuple3<Integer, Long, Double>(4, Long.valueOf(4), 20.0),
-               new Tuple3<Integer, Long, Double>(4, Long.valueOf(5), 23.2),
-               new Tuple3<Integer, Long, Double>(4, Long.valueOf(9), 20.0),
-               new Tuple3<Integer, Long, Double>(4, Long.valueOf(10), 24.0),
-               new Tuple3<Integer, Long, Double>(4, Long.valueOf(19), 23.2),
-               new Tuple3<Integer, Long, Double>(4, Long.valueOf(20), 24.0),
-               new Tuple3<Integer, Long, Double>(4, Long.valueOf(24), 20.0),
-               new Tuple3<Integer, Long, Double>(4, Long.valueOf(25), 23.2)
-       };
-
-       @Override
-       protected TupleComparator<Tuple3<Integer, Long, Double>> 
createComparator(boolean ascending) {
-               return new TupleComparator<Tuple3<Integer, Long, Double>>(
-                               new int[]{1},
-                               new TypeComparator[]{
-                                       new LongComparator(ascending)
-                               },
-               new TypeSerializer[]{  IntSerializer.INSTANCE, 
LongSerializer.INSTANCE });
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       protected TupleSerializer<Tuple3<Integer, Long, Double>> 
createSerializer() {
-               return new TupleSerializer<Tuple3<Integer, Long, Double>>(
-                               (Class<Tuple3<Integer, Long, Double>>) 
(Class<?>) Tuple3.class,
-                               new TypeSerializer[]{
-                                       new IntSerializer(),
-                                       new LongSerializer(),
-                                       new DoubleSerializer()});
-       }
-
-       @Override
-       protected Tuple3<Integer, Long, Double>[] getSortedTestData() {
-               return dataISD;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21a71586/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDXC2Test.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDXC2Test.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDXC2Test.java
deleted file mode 100644
index 793a2f4..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDXC2Test.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.DoubleComparator;
-import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.LongComparator;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.java.tuple.Tuple3;
-
-import 
org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
-
-public class TupleComparatorILDXC2Test extends 
TupleComparatorTestBase<Tuple3<Integer, Long, Double>> {
-
-       @SuppressWarnings("unchecked")
-       Tuple3<Integer, Long, Double>[] dataISD = new Tuple3[]{
-               new Tuple3<Integer, Long, Double>(4, 4L, 20.0),
-               new Tuple3<Integer, Long, Double>(4, 5L, 20.0),
-               new Tuple3<Integer, Long, Double>(4, 3L, 23.0),
-               new Tuple3<Integer, Long, Double>(4, 19L, 23.0),
-               new Tuple3<Integer, Long, Double>(4, 17L, 24.0),
-               new Tuple3<Integer, Long, Double>(4, 18L, 24.0),
-               new Tuple3<Integer, Long, Double>(4, 24L, 25.0),
-               new Tuple3<Integer, Long, Double>(4, 25L, 25.0)
-       };
-
-       @Override
-       protected TupleComparator<Tuple3<Integer, Long, Double>> 
createComparator(boolean ascending) {
-               return new TupleComparator<Tuple3<Integer, Long, Double>>(
-                               new int[]{2, 1},
-                               new TypeComparator[]{
-                                       new DoubleComparator(ascending),
-                                       new LongComparator(ascending)
-                               },
-               new TypeSerializer[]{ IntSerializer.INSTANCE, 
DoubleSerializer.INSTANCE, LongSerializer.INSTANCE });
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       protected TupleSerializer<Tuple3<Integer, Long, Double>> 
createSerializer() {
-               return new TupleSerializer<Tuple3<Integer, Long, Double>>(
-                               (Class<Tuple3<Integer, Long, Double>>) 
(Class<?>) Tuple3.class,
-                               new TypeSerializer[]{
-                                       new IntSerializer(),
-                                       new LongSerializer(),
-                                       new DoubleSerializer()});
-       }
-
-       @Override
-       protected Tuple3<Integer, Long, Double>[] getSortedTestData() {
-               return dataISD;
-       }
-
-}

Reply via email to