http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
index 51d4b0e..0aa6097 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.operators.base;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.RichCoGroupFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
@@ -66,16 +67,19 @@ public class CoGroupOperatorCollectionTest implements 
Serializable {
                                                        .build()
                        );
 
-                       final RuntimeContext ctx = new RuntimeUDFContext("Test 
UDF", 4, 0, null);
+                       ExecutionConfig executionConfig = new ExecutionConfig();
+                       final RuntimeContext ctx = new RuntimeUDFContext("Test 
UDF", 4, 0, null, executionConfig);
 
                        {
                                SumCoGroup udf1 = new SumCoGroup();
                                SumCoGroup udf2 = new SumCoGroup();
-                               
+
+                               executionConfig.disableObjectReuse();
                                List<Tuple2<String, Integer>> resultSafe = 
getCoGroupOperator(udf1)
-                                               .executeOnCollections(input1, 
input2, ctx, true);
+                                               .executeOnCollections(input1, 
input2, ctx, executionConfig);
+                               executionConfig.enableObjectReuse();
                                List<Tuple2<String, Integer>> resultRegular = 
getCoGroupOperator(udf2)
-                                               .executeOnCollections(input1, 
input2, ctx, false);
+                                               .executeOnCollections(input1, 
input2, ctx, executionConfig);
 
                                Assert.assertTrue(udf1.isClosed);
                                Assert.assertTrue(udf2.isClosed);
@@ -95,13 +99,15 @@ public class CoGroupOperatorCollectionTest implements 
Serializable {
                        }
 
                        {
+                               executionConfig.disableObjectReuse();
                                List<Tuple2<String, Integer>> resultSafe = 
getCoGroupOperator(new SumCoGroup())
                                                
.executeOnCollections(Collections.<Tuple2<String, Integer>>emptyList(),
-                                                               
Collections.<Tuple2<String, Integer>>emptyList(), ctx, true);
-                               
+                                                               
Collections.<Tuple2<String, Integer>>emptyList(), ctx, executionConfig);
+
+                               executionConfig.enableObjectReuse();
                                List<Tuple2<String, Integer>> resultRegular = 
getCoGroupOperator(new SumCoGroup())
                                                
.executeOnCollections(Collections.<Tuple2<String, Integer>>emptyList(),
-                                                               
Collections.<Tuple2<String, Integer>>emptyList(), ctx, false);
+                                                               
Collections.<Tuple2<String, Integer>>emptyList(), ctx, executionConfig);
 
                                Assert.assertEquals(0, resultSafe.size());
                                Assert.assertEquals(0, resultRegular.size());

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
index cfca5aa..447c8c5 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.operators.base;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
@@ -79,9 +80,12 @@ public class GroupReduceOperatorTest implements 
java.io.Serializable {
                                        Integer>("foo", 3), new Tuple2<String, 
Integer>("bar", 2), new Tuple2<String,
                                        Integer>("bar", 4)));
 
-                       
-                       List<Tuple2<String, Integer>> resultMutableSafe = 
op.executeOnCollections(input, null, true);
-                       List<Tuple2<String, Integer>> resultRegular = 
op.executeOnCollections(input, null, false);
+
+                       ExecutionConfig executionConfig = new ExecutionConfig();
+                       executionConfig.disableObjectReuse();
+                       List<Tuple2<String, Integer>> resultMutableSafe = 
op.executeOnCollections(input, null, executionConfig);
+                       executionConfig.enableObjectReuse();
+                       List<Tuple2<String, Integer>> resultRegular = 
op.executeOnCollections(input, null, executionConfig);
                        
                        Set<Tuple2<String, Integer>> resultSetMutableSafe = new 
HashSet<Tuple2<String, Integer>>(resultMutableSafe);
                        Set<Tuple2<String, Integer>> resultSetRegular = new 
HashSet<Tuple2<String, Integer>>(resultRegular);
@@ -155,8 +159,11 @@ public class GroupReduceOperatorTest implements 
java.io.Serializable {
                                        Integer>("foo", 3), new Tuple2<String, 
Integer>("bar", 2), new Tuple2<String,
                                        Integer>("bar", 4)));
 
-                       List<Tuple2<String, Integer>> resultMutableSafe = 
op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), 
true);
-                       List<Tuple2<String, Integer>> resultRegular = 
op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), 
false);
+                       ExecutionConfig executionConfig = new ExecutionConfig();
+                       executionConfig.disableObjectReuse();
+                       List<Tuple2<String, Integer>> resultMutableSafe = 
op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, 
executionConfig), executionConfig);
+                       executionConfig.enableObjectReuse();
+                       List<Tuple2<String, Integer>> resultRegular = 
op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, 
executionConfig), executionConfig);
                        
                        
                        Set<Tuple2<String, Integer>> resultSetMutableSafe = new 
HashSet<Tuple2<String, Integer>>(resultMutableSafe);

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
index b4ef54f..1b38281 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
@@ -20,10 +20,10 @@ package org.apache.flink.api.common.operators.base;
 
 import static org.junit.Assert.*;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -101,8 +101,11 @@ public class JoinOperatorBaseTest implements Serializable {
                ));
 
                try {
-                       List<Tuple2<Double, String>> resultSafe = 
base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 
1, 0, null), true);
-                       List<Tuple2<Double, String>> resultRegular = 
base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 
1, 0, null), false);
+                       ExecutionConfig executionConfig = new ExecutionConfig();
+                       executionConfig.disableObjectReuse();
+                       List<Tuple2<Double, String>> resultSafe = 
base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 
1, 0, null, executionConfig), executionConfig);
+                       executionConfig.enableObjectReuse();
+                       List<Tuple2<Double, String>> resultRegular = 
base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 
1, 0, null, executionConfig), executionConfig);
 
                        assertEquals(expected, new HashSet<Tuple2<Double, 
String>>(resultSafe));
                        assertEquals(expected, new HashSet<Tuple2<Double, 
String>>(resultRegular));

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
index 90bbe41..4e1eebd 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.operators.base;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
@@ -68,8 +69,11 @@ public class ReduceOperatorTest implements 
java.io.Serializable {
                                        Integer>("foo", 3), new Tuple2<String, 
Integer>("bar", 2), new Tuple2<String,
                                        Integer>("bar", 4)));
 
-                       List<Tuple2<String, Integer>> resultMutableSafe = 
op.executeOnCollections(input, null, true);
-                       List<Tuple2<String, Integer>> resultRegular = 
op.executeOnCollections(input, null, false);
+                       ExecutionConfig executionConfig = new ExecutionConfig();
+                       executionConfig.disableObjectReuse();
+                       List<Tuple2<String, Integer>> resultMutableSafe = 
op.executeOnCollections(input, null, executionConfig);
+                       executionConfig.enableObjectReuse();
+                       List<Tuple2<String, Integer>> resultRegular = 
op.executeOnCollections(input, null, executionConfig);
                        
                        Set<Tuple2<String, Integer>> resultSetMutableSafe = new 
HashSet<Tuple2<String, Integer>>(resultMutableSafe);
                        Set<Tuple2<String, Integer>> resultSetRegular = new 
HashSet<Tuple2<String, Integer>>(resultRegular);
@@ -132,8 +136,11 @@ public class ReduceOperatorTest implements 
java.io.Serializable {
                                        Integer>("foo", 3), new Tuple2<String, 
Integer>("bar", 2), new Tuple2<String,
                                        Integer>("bar", 4)));
 
-                       List<Tuple2<String, Integer>> resultMutableSafe = 
op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), 
true);
-                       List<Tuple2<String, Integer>> resultRegular = 
op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), 
false);
+                       ExecutionConfig executionConfig = new ExecutionConfig();
+                       executionConfig.disableObjectReuse();
+                       List<Tuple2<String, Integer>> resultMutableSafe = 
op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, 
executionConfig), executionConfig);
+                       executionConfig.enableObjectReuse();
+                       List<Tuple2<String, Integer>> resultRegular = 
op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, 
executionConfig), executionConfig);
 
                        Set<Tuple2<String, Integer>> resultSetMutableSafe = new 
HashSet<Tuple2<String, Integer>>(resultMutableSafe);
                        Set<Tuple2<String, Integer>> resultSetRegular = new 
HashSet<Tuple2<String, Integer>>(resultRegular);

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
index 118e707..cd495b5 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -91,7 +92,7 @@ public class CollectionInputFormatTest {
                        TypeInformation<ElementType> info = 
(TypeInformation<ElementType>) TypeExtractor.createTypeInfo(ElementType.class);
        
                        CollectionInputFormat<ElementType> inputFormat = new 
CollectionInputFormat<ElementType>(inputCollection,
-                                       info.createSerializer());
+                                       info.createSerializer(new 
ExecutionConfig()));
 
                        ByteArrayOutputStream buffer = new 
ByteArrayOutputStream();
                        ObjectOutputStream out = new ObjectOutputStream(buffer);
@@ -168,7 +169,7 @@ public class CollectionInputFormatTest {
                
                try {
                        List<String> inputCollection = Arrays.asList(data);
-                       CollectionInputFormat<String> inputFormat = new 
CollectionInputFormat<String>(inputCollection, 
BasicTypeInfo.STRING_TYPE_INFO.createSerializer());
+                       CollectionInputFormat<String> inputFormat = new 
CollectionInputFormat<String>(inputCollection, 
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()));
                        
                        // serialize
                        ByteArrayOutputStream baos = new 
ByteArrayOutputStream();

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
index 7dd1135..f6e3c2a 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.io;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.BinaryInputFormat;
 import org.apache.flink.api.common.io.BinaryOutputFormat;
 import org.apache.flink.api.common.io.BlockInfo;
@@ -51,7 +52,7 @@ public class TypeSerializerFormatTest extends 
SequentialFormatTestBase<Tuple2<In
 
         resultType = TypeExtractor.getForObject(getRecord(0));
 
-               serializer = resultType.createSerializer();
+               serializer = resultType.createSerializer(new ExecutionConfig());
        }
 
        @Before

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
index b96b3bc..dade55c 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
@@ -1077,10 +1077,16 @@ public class TypeExtractorTest {
 
        public interface Testable {}
 
-       public abstract class AbstractClass {}
+       public static abstract class AbstractClassWithoutMember {}
+
+       public static abstract class AbstractClassWithMember {
+               public int x;
+       }
 
        @Test
-       public void testAbstractAndInterfaceTypesException() {
+       public void testAbstractAndInterfaceTypes() {
+
+               // interface
                RichMapFunction<String, ?> function = new 
RichMapFunction<String, Testable>() {
                        private static final long serialVersionUID = 1L;
 
@@ -1089,21 +1095,35 @@ public class TypeExtractorTest {
                                return null;
                        }
                };
-               
-               TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(function, BasicTypeInfo.STRING_TYPE_INFO, null, 
true);
+
+               TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(function, BasicTypeInfo.STRING_TYPE_INFO);
+               Assert.assertTrue(ti instanceof GenericTypeInfo);
+
+               // abstract class with out class member
+               RichMapFunction<String, ?> function2 = new 
RichMapFunction<String, AbstractClassWithoutMember>() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public AbstractClassWithoutMember map(String value) 
throws Exception {
+                               return null;
+                       }
+               };
+
+               ti = TypeExtractor.getMapReturnTypes(function2, 
BasicTypeInfo.STRING_TYPE_INFO);
                Assert.assertTrue(ti instanceof GenericTypeInfo);
 
-               RichMapFunction<String, ?> function2 = new 
RichMapFunction<String, AbstractClass>() {
+               // abstract class with class member
+               RichMapFunction<String, ?> function3 = new 
RichMapFunction<String, AbstractClassWithMember>() {
                        private static final long serialVersionUID = 1L;
 
                        @Override
-                       public AbstractClass map(String value) throws Exception 
{
+                       public AbstractClassWithMember map(String value) throws 
Exception {
                                return null;
                        }
                };
 
-               TypeInformation<?> ti2 = 
TypeExtractor.getMapReturnTypes(function2, BasicTypeInfo.STRING_TYPE_INFO, 
null, true);
-               Assert.assertTrue(ti2 instanceof GenericTypeInfo);
+               ti = TypeExtractor.getMapReturnTypes(function3, 
BasicTypeInfo.STRING_TYPE_INFO);
+               Assert.assertTrue(ti instanceof PojoTypeInfo);
        }
 
        @SuppressWarnings({ "rawtypes", "unchecked" })
@@ -1727,7 +1747,7 @@ public class TypeExtractorTest {
                                + "myField1=String,myField2=int"
                                + ">[][][]"));
                
Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<ObjectArrayTypeInfo<"
-                               + 
"PojoType<org.apache.flink.api.java.type.extractor.TypeExtractorTest.CustomType,
 fields = [myField1: String, myField2: Integer]>"
+                               + 
"PojoType<org.apache.flink.api.java.type.extractor.TypeExtractorTest$CustomType,
 fields = [myField1: String, myField2: Integer]>"
                                + ">>>", ti.toString());
                
                // generic array

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/CompositeTypeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/CompositeTypeTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/CompositeTypeTest.java
index c71625a..33aa449 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/CompositeTypeTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/CompositeTypeTest.java
@@ -52,7 +52,8 @@ public class CompositeTypeTest {
                        inNestedTuple3,
                        BasicTypeInfo.INT_TYPE_INFO     );
 
-       private final PojoTypeInfo<?> pojoTypeInfo = 
((PojoTypeInfo<?>)TypeExtractor.getForClass(MyPojo.class));
+       private final PojoTypeInfo<?> pojoTypeInfo = ((PojoTypeInfo<?>) 
TypeExtractor.getForClass
+                       (MyPojo.class));
 
        private final TupleTypeInfo<?> pojoInTupleTypeInfo = new 
TupleTypeInfo<Tuple2<Integer, MyPojo>>(BasicTypeInfo.INT_TYPE_INFO, 
pojoTypeInfo);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
index c2f3737..eadf96d 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
@@ -28,12 +28,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.api.java.typeutils.ValueTypeInfo;
-import org.apache.flink.api.java.typeutils.WritableTypeInfo;
 import org.apache.flink.types.BooleanValue;
 import org.apache.flink.types.ByteValue;
 import org.apache.flink.types.CharValue;
@@ -300,7 +294,7 @@ public class TypeInfoParserTest {
                // pojos
                ti = 
TypeInfoParser.parse("org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyPojo<basic=String>[][][]");
                
Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<ObjectArrayTypeInfo<"
-                               + 
"PojoType<org.apache.flink.api.java.typeutils.TypeInfoParserTest.MyPojo, fields 
= [basic: String]>"
+                               + 
"PojoType<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyPojo, fields 
= [basic: String]>"
                                + ">>>", ti.toString());
                
                // basic types

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparatorTest.java
index a17f499..b317275 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparatorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueComparatorTest.java
@@ -21,8 +21,6 @@ package org.apache.flink.api.java.typeutils.runtime;
 import org.apache.flink.api.common.typeutils.ComparatorTestBase;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.CopyableValueComparator;
-import org.apache.flink.api.java.typeutils.runtime.CopyableValueSerializer;
 import org.apache.flink.types.StringValue;
 
 public class CopyableValueComparatorTest extends 
ComparatorTestBase<StringValue> {

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericArraySerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericArraySerializerTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericArraySerializerTest.java
index c190727..d1163d5 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericArraySerializerTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericArraySerializerTest.java
@@ -18,11 +18,12 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
 public class KryoGenericArraySerializerTest extends 
AbstractGenericArraySerializerTest {
        @Override
        protected <T> TypeSerializer<T> createComponentSerializer(Class<T> 
type) {
-               return new KryoSerializer<T>(type);
+               return new KryoSerializer<T>(type, new ExecutionConfig());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java
index 37dba4e..01c76d9 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java
@@ -18,11 +18,12 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
 public class KryoGenericTypeComparatorTest extends 
AbstractGenericTypeComparatorTest {
        @Override
        protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
-               return new KryoSerializer<T>(type);
+               return new KryoSerializer<T>(type, new ExecutionConfig());
        }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
index 5953599..8630d95 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java.typeutils.runtime;
 
 import static org.junit.Assert.*;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.junit.Test;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
@@ -70,7 +71,7 @@ public class KryoGenericTypeSerializerTest extends 
AbstractGenericTypeSerializer
 
        @Override
        protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
-               return new KryoSerializer<T>(type);
+               return new KryoSerializer<T>(type, new ExecutionConfig());
        }
        
        /**
@@ -94,7 +95,7 @@ public class KryoGenericTypeSerializerTest extends 
AbstractGenericTypeSerializer
                        
                        // construct a memory target that is too small for the 
string
                        TestDataOutputSerializer target = new 
TestDataOutputSerializer(10000, 30000);
-                       KryoSerializer<String> serializer = new 
KryoSerializer<String>(String.class);
+                       KryoSerializer<String> serializer = new 
KryoSerializer<String>(String.class, new ExecutionConfig());
                        
                        try {
                                serializer.serialize(str, target);

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoVersusAvroMinibenchmark.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoVersusAvroMinibenchmark.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoVersusAvroMinibenchmark.java
index 4c6b39f..b2ed219 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoVersusAvroMinibenchmark.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoVersusAvroMinibenchmark.java
@@ -27,6 +27,10 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.MemoryUtils;
@@ -84,9 +88,12 @@ public class KryoVersusAvroMinibenchmark {
                        System.out.println("Kryo serializer");
                        {
                                final TestDataOutputSerializer outView = new 
TestDataOutputSerializer(100000000);
-                               final KryoSerializer<MyType> serializer = new 
KryoSerializer<MyType>(MyType.class);
-                               serializer.getKryo().register(Tuple2.class);
-                               
+                               ExecutionConfig conf = new ExecutionConfig();
+                               conf.registerKryoType(MyType.class);
+                               conf.enableForceKryo();
+                               TypeInformation<MyType> typeInfo = new 
GenericTypeInfo<MyType>(MyType.class);
+                               final TypeSerializer<MyType> serializer = 
typeInfo.createSerializer(conf);
+
                                long start = System.nanoTime();
                                
                                for (int k = 0; k < NUM_ELEMENTS; k++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoWithCustomSerializersTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoWithCustomSerializersTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoWithCustomSerializersTest.java
index 9d7ab61..7020d80 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoWithCustomSerializersTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoWithCustomSerializersTest.java
@@ -21,7 +21,10 @@ package org.apache.flink.api.java.typeutils.runtime;
 import java.util.Collection;
 import java.util.HashSet;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.joda.time.LocalDate;
 import org.junit.Test;
 
@@ -40,15 +43,16 @@ public class KryoWithCustomSerializersTest extends 
AbstractGenericTypeSerializer
 
                b.add(new LocalDate(1L));
                b.add(new LocalDate(2L));
-               
-               KryoSerializer.registerSerializer(LocalDate.class, 
LocalDateSerializer.class);
-               
+
                runTests(b);
        }
 
        @Override
        protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
-               return new KryoSerializer<T>(type);
+               ExecutionConfig conf = new ExecutionConfig();
+               conf.registerKryoSerializer(LocalDate.class, 
LocalDateSerializer.class);
+               TypeInformation<T> typeInfo = new GenericTypeInfo<T>(type);
+               return typeInfo.createSerializer(conf);
        }
        
        public static final class LocalDateSerializer extends 
Serializer<LocalDate> implements java.io.Serializable {

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/MultidimensionalArraySerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/MultidimensionalArraySerializerTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/MultidimensionalArraySerializerTest.java
index 22b6c76..1c97816 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/MultidimensionalArraySerializerTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/MultidimensionalArraySerializerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.SerializerTestInstance;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -35,7 +36,7 @@ public class MultidimensionalArraySerializerTest {
                String[][] array = new 
String[][]{{null,"b"},{"c","d"},{"e","f"},{"g","h"},null};
                TypeInformation<String[][]> ti = 
TypeExtractor.getForClass(String[][].class);
 
-               SerializerTestInstance<String[][]> testInstance = new 
SerializerTestInstance<String[][]>(ti.createSerializer(), String[][].class, -1, 
array);
+               SerializerTestInstance<String[][]> testInstance = new 
SerializerTestInstance<String[][]>(ti.createSerializer(new ExecutionConfig()), 
String[][].class, -1, array);
                testInstance.testAll();
        }
 
@@ -44,7 +45,7 @@ public class MultidimensionalArraySerializerTest {
                int[][] array = new 
int[][]{{12,1},{48,42},{23,80},{484,849},{987,4}};
                TypeInformation<int[][]> ti = 
TypeExtractor.getForClass(int[][].class);
 
-               SerializerTestInstance<int[][]> testInstance = new 
SerializerTestInstance<int[][]>(ti.createSerializer(), int[][].class, -1, 
array);
+               SerializerTestInstance<int[][]> testInstance = new 
SerializerTestInstance<int[][]>(ti.createSerializer(new ExecutionConfig()), 
int[][].class, -1, array);
                testInstance.testAll();
        }
 
@@ -73,13 +74,13 @@ public class MultidimensionalArraySerializerTest {
                Integer[][] array = new Integer[][]{{0,1}, null, {null, 42}};
                TypeInformation<Integer[][]> ti = 
TypeExtractor.getForClass(Integer[][].class);
 
-               SerializerTestInstance<Integer[][]> testInstance = new 
SerializerTestInstance<Integer[][]>(ti.createSerializer(), Integer[][].class, 
-1, array);
+               SerializerTestInstance<Integer[][]> testInstance = new 
SerializerTestInstance<Integer[][]>(ti.createSerializer(new ExecutionConfig()), 
Integer[][].class, -1, array);
                testInstance.testAll();
 
                MyPojo[][] array2 = new MyPojo[][]{{new MyPojo(null, 42), new 
MyPojo("test2", -1)}, {null, null}, null};
                TypeInformation<MyPojo[][]> ti2 = 
TypeExtractor.getForClass(MyPojo[][].class);
 
-               SerializerTestInstance<MyPojo[][]> testInstance2 = new 
SerializerTestInstance<MyPojo[][]>(ti2.createSerializer(), MyPojo[][].class, 
-1, array2);
+               SerializerTestInstance<MyPojo[][]> testInstance2 = new 
SerializerTestInstance<MyPojo[][]>(ti2.createSerializer(new ExecutionConfig()), 
MyPojo[][].class, -1, array2);
                testInstance2.testAll();
        }
 
@@ -112,7 +113,7 @@ public class MultidimensionalArraySerializerTest {
                };
                TypeInformation ti = 
TypeInfoParser.parse("org.apache.flink.api.java.typeutils.runtime.MultidimensionalArraySerializerTest$MyGenericPojo<field=String[][]>[][]");
 
-               SerializerTestInstance testInstance = new 
SerializerTestInstance(ti.createSerializer(), MyGenericPojo[][].class, -1, 
(Object) array);
+               SerializerTestInstance testInstance = new 
SerializerTestInstance(ti.createSerializer(new ExecutionConfig()), 
MyGenericPojo[][].class, -1, (Object) array);
                testInstance.testAll();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java
index e53f48a..1baf443 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java.typeutils.runtime;
 
 import java.util.Arrays;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.ComparatorTestBase;
 import org.apache.flink.api.common.typeutils.CompositeType;
@@ -47,12 +48,12 @@ public class PojoComparatorTest extends 
ComparatorTestBase<PojoContainingTuple>
                ExpressionKeys<PojoContainingTuple> keys = new 
ExpressionKeys<PojoContainingTuple>(new String[] {"theTuple.*"}, cType);
                boolean[] orders = new boolean[keys.getNumberOfKeyFields()];
                Arrays.fill(orders, ascending);
-               return 
cType.createComparator(keys.computeLogicalKeyPositions(), orders, 0);
+               return 
cType.createComparator(keys.computeLogicalKeyPositions(), orders, 0, new 
ExecutionConfig());
        }
 
        @Override
        protected TypeSerializer<PojoContainingTuple> createSerializer() {
-               return type.createSerializer();
+               return type.createSerializer(new ExecutionConfig());
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoGenericTypeSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoGenericTypeSerializerTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoGenericTypeSerializerTest.java
index a176178..d405412 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoGenericTypeSerializerTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoGenericTypeSerializerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -27,6 +28,6 @@ public class PojoGenericTypeSerializerTest extends 
AbstractGenericTypeSerializer
        @Override
        protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
                TypeInformation<T> typeInfo = TypeExtractor.getForClass(type);
-               return typeInfo.createSerializer();
+               return typeInfo.createSerializer(new ExecutionConfig());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
index 006625e..1fa7163 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.SerializerTestBase;
 import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -40,14 +41,14 @@ import org.junit.Test;
 import com.google.common.base.Objects;
 
 /**
- * A test for the {@link 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer}.
+ * A test for the {@link PojoSerializer}.
  */
 public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.TestUserClass> {
        private TypeInformation<TestUserClass> type = 
TypeExtractor.getForClass(TestUserClass.class);
 
        @Override
        protected TypeSerializer<TestUserClass> createSerializer() {
-               TypeSerializer<TestUserClass> serializer = 
type.createSerializer();
+               TypeSerializer<TestUserClass> serializer = 
type.createSerializer(new ExecutionConfig());
                assert(serializer instanceof PojoSerializer);
                return serializer;
        }
@@ -199,14 +200,14 @@ public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.Te
                pType.getFlatFields("nestedClass.dumm2", 0, result);
                int[] fields = new int[1]; // see below
                fields[0] = result.get(0).getPosition();
-               TypeComparator<TestUserClass> pojoComp = 
pType.createComparator( fields, new boolean[]{true}, 0);
+               TypeComparator<TestUserClass> pojoComp = 
pType.createComparator( fields, new boolean[]{true}, 0, new ExecutionConfig());
                
                TestUserClass pojoTestRecord = new TestUserClass(0, "abc", 3d, 
new int[] {1,2,3}, new NestedTestUserClass(1, "haha", 4d, new int[] {5,4,3}));
                int pHash = pojoComp.hash(pojoTestRecord);
                
                Tuple1<String> tupleTest = new Tuple1<String>("haha");
                TupleTypeInfo<Tuple1<String>> tType = 
(TupleTypeInfo<Tuple1<String>>)TypeExtractor.getForObject(tupleTest);
-               TypeComparator<Tuple1<String>> tupleComp = 
tType.createComparator(new int[] {0}, new boolean[] {true}, 0);
+               TypeComparator<Tuple1<String>> tupleComp = 
tType.createComparator(new int[] {0}, new boolean[] {true}, 0, new 
ExecutionConfig());
                
                int tHash = tupleComp.hash(tupleTest);
                
@@ -223,12 +224,12 @@ public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.Te
                        e.printStackTrace();
                        Assert.fail("Keys must be compatible: "+e.getMessage());
                }
-               TypeComparator<TestUserClass> multiPojoComp = 
pType.createComparator( expressKey.computeLogicalKeyPositions(), new 
boolean[]{true, true, true}, 0);
+               TypeComparator<TestUserClass> multiPojoComp = 
pType.createComparator( expressKey.computeLogicalKeyPositions(), new 
boolean[]{true, true, true}, 0, new ExecutionConfig());
                int multiPojoHash = multiPojoComp.hash(pojoTestRecord);
                
                
                // pojo order is: dumm2 (str), dumm1 (int), dumm3 (double).
-               TypeComparator<Tuple3<Integer, String, Double>> multiTupleComp 
= multiTupleType.createComparator(fieldKey.computeLogicalKeyPositions(), new 
boolean[] {true, true,true}, 0);
+               TypeComparator<Tuple3<Integer, String, Double>> multiTupleComp 
= multiTupleType.createComparator(fieldKey.computeLogicalKeyPositions(), new 
boolean[] {true, true,true}, 0, new ExecutionConfig());
                int multiTupleHash = multiTupleComp.hash(multiTupleTest);
                
                Assert.assertTrue("The hashing for tuples and pojos must be the 
same, so that they are mixable. Also for those with multiple key fields", 
multiPojoHash == multiTupleHash);

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassComparatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassComparatorTest.java
new file mode 100644
index 0000000..3a03683
--- /dev/null
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassComparatorTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.junit.Assert;
+
+import java.util.Arrays;
+
+
+public class PojoSubclassComparatorTest extends 
ComparatorTestBase<PojoContainingTuple> {
+       TypeInformation<PojoContainingTuple> type = 
TypeExtractor.getForClass(PojoContainingTuple.class);
+       
+       PojoContainingTuple[] data = new PojoContainingTuple[]{
+               new Subclass(1, 1L, 1L, 17L),
+               new Subclass(2, 2L, 2L, 42L),
+               new Subclass(8519, 85190L, 85190L, 117L),
+               new Subclass(8520, 85191L, 85191L, 93L),
+       };
+
+       @Override
+       protected TypeComparator<PojoContainingTuple> createComparator(boolean 
ascending) {
+               Assert.assertTrue(type instanceof CompositeType);
+               CompositeType<PojoContainingTuple> cType = 
(CompositeType<PojoContainingTuple>) type;
+               ExpressionKeys<PojoContainingTuple> keys = new 
ExpressionKeys<PojoContainingTuple>(new String[] {"theTuple.*"}, cType);
+               boolean[] orders = new boolean[keys.getNumberOfKeyFields()];
+               Arrays.fill(orders, ascending);
+               return 
cType.createComparator(keys.computeLogicalKeyPositions(), orders, 0, new 
ExecutionConfig());
+       }
+
+       @Override
+       protected TypeSerializer<PojoContainingTuple> createSerializer() {
+               return type.createSerializer(new ExecutionConfig());
+       }
+
+       @Override
+       protected PojoContainingTuple[] getSortedTestData() {
+               return data;
+       }
+
+       public static class Subclass extends PojoContainingTuple {
+
+               public long additionalField;
+
+               public Subclass() {
+               }
+
+               public Subclass(int i, long l1, long l2, long additionalField) {
+                       super(i, l1, l2);
+                       this.additionalField = additionalField;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java
new file mode 100644
index 0000000..8c61a19
--- /dev/null
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import com.google.common.base.Objects;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.junit.Test;
+
+import java.util.Random;
+
+/**
+ * A test for the {@link PojoSerializer}.
+ */
+public class PojoSubclassSerializerTest extends 
SerializerTestBase<PojoSubclassSerializerTest.TestUserClassBase> {
+       private TypeInformation<TestUserClassBase> type = 
TypeExtractor.getForClass(TestUserClassBase.class);
+
+       @Override
+       protected TypeSerializer<TestUserClassBase> createSerializer() {
+               // only register one of the three child classes, the third 
child class is NO POJO
+               ExecutionConfig conf = new ExecutionConfig();
+               conf.registerPojoType(TestUserClass1.class);
+               TypeSerializer<TestUserClassBase> serializer = 
type.createSerializer(conf);
+               assert(serializer instanceof PojoSerializer);
+               return serializer;
+       }
+
+       @Override
+       protected int getLength() {
+               return -1;
+       }
+
+       @Override
+       protected Class<TestUserClassBase> getTypeClass() {
+               return TestUserClassBase.class;
+       }
+
+       @Override
+       protected TestUserClassBase[] getTestData() {
+               Random rnd = new Random(874597969123412341L);
+
+               return new TestUserClassBase[]{
+                               new TestUserClass1(rnd.nextInt(), "foo", 
rnd.nextLong()),
+                               new TestUserClass2(rnd.nextInt(), "bar", 
rnd.nextFloat()),
+                               new TestUserClass3(rnd.nextInt(), "bar", 
rnd.nextFloat())
+               };
+
+       }
+
+       @Override
+       @Test
+       public void testInstantiate() {
+               // don't do anything, since the PojoSerializer with subclass 
will return null
+       }
+
+       // User code class for testing the serializer
+       public static abstract class TestUserClassBase {
+               public int dumm1;
+               public String dumm2;
+
+
+               public TestUserClassBase() {
+               }
+
+               public TestUserClassBase(int dumm1, String dumm2) {
+                       this.dumm1 = dumm1;
+                       this.dumm2 = dumm2;
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hashCode(dumm1, dumm2);
+               }
+
+               @Override
+               public boolean equals(Object other) {
+                       if (!(other instanceof TestUserClassBase)) {
+                               return false;
+                       }
+                       TestUserClassBase otherTUC = (TestUserClassBase) other;
+                       if (dumm1 != otherTUC.dumm1) {
+                               return false;
+                       }
+                       if (!dumm2.equals(otherTUC.dumm2)) {
+                               return false;
+                       }
+                       return true;
+               }
+       }
+
+       public static class TestUserClass1 extends TestUserClassBase {
+               public long dumm3;
+
+               public TestUserClass1() {
+               }
+
+               public TestUserClass1(int dumm1, String dumm2, long dumm3) {
+                       super(dumm1, dumm2);
+                       this.dumm3 = dumm3;
+               }
+
+               @Override
+               public boolean equals(Object other) {
+                       if (!(other instanceof TestUserClass1)) {
+                               return false;
+                       }
+                       TestUserClass1 otherTUC = (TestUserClass1) other;
+                       if (dumm1 != otherTUC.dumm1) {
+                               return false;
+                       }
+                       if (!dumm2.equals(otherTUC.dumm2)) {
+                               return false;
+                       }
+                       if (dumm3 != otherTUC.dumm3) {
+                               return false;
+                       }
+                       return true;
+               }
+       }
+
+       public static class TestUserClass2 extends TestUserClassBase {
+               public float dumm4;
+
+               public TestUserClass2() {
+               }
+
+               public TestUserClass2(int dumm1, String dumm2, float dumm4) {
+                       super(dumm1, dumm2);
+                       this.dumm4 = dumm4;
+               }
+
+               @Override
+               public boolean equals(Object other) {
+                       if (!(other instanceof TestUserClass2)) {
+                               return false;
+                       }
+                       TestUserClass2 otherTUC = (TestUserClass2) other;
+                       if (dumm1 != otherTUC.dumm1) {
+                               return false;
+                       }
+                       if (!dumm2.equals(otherTUC.dumm2)) {
+                               return false;
+                       }
+                       if (dumm4 != otherTUC.dumm4) {
+                               return false;
+                       }
+                       return true;
+               }
+       }
+
+       public static class TestUserClass3 extends TestUserClassBase {
+               public float dumm4;
+
+               public TestUserClass3(int dumm1, String dumm2, float dumm4) {
+                       super(dumm1, dumm2);
+                       this.dumm4 = dumm4;
+               }
+
+               @Override
+               public boolean equals(Object other) {
+                       if (!(other instanceof TestUserClass3)) {
+                               return false;
+                       }
+                       TestUserClass3 otherTUC = (TestUserClass3) other;
+                       if (dumm1 != otherTUC.dumm1) {
+                               return false;
+                       }
+                       if (!dumm2.equals(otherTUC.dumm2)) {
+                               return false;
+                       }
+                       if (dumm4 != otherTUC.dumm4) {
+                               return false;
+                       }
+                       return true;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/SubclassFromInterfaceSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/SubclassFromInterfaceSerializerTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/SubclassFromInterfaceSerializerTest.java
new file mode 100644
index 0000000..efb1b9b
--- /dev/null
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/SubclassFromInterfaceSerializerTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import com.google.common.base.Objects;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.junit.Test;
+
+import java.util.Random;
+
+/**
+ * Testing the serialization of classes which are subclasses of a class that 
implements an interface.
+ */
+public class SubclassFromInterfaceSerializerTest extends 
SerializerTestBase<SubclassFromInterfaceSerializerTest.TestUserInterface> {
+       private TypeInformation<TestUserInterface> type = 
TypeExtractor.getForClass(TestUserInterface.class);
+
+       @Override
+       protected TypeSerializer<TestUserInterface> createSerializer() {
+               // only register one of the two child classes
+               ExecutionConfig conf = new ExecutionConfig();
+               conf.registerPojoType(TestUserClass2.class);
+               TypeSerializer<TestUserInterface> serializer = 
type.createSerializer(conf);
+               assert(serializer instanceof KryoSerializer);
+               return serializer;
+       }
+
+       @Override
+       protected int getLength() {
+               return -1;
+       }
+
+       @Override
+       protected Class<TestUserInterface> getTypeClass() {
+               return TestUserInterface.class;
+       }
+
+       @Override
+       protected TestUserInterface[] getTestData() {
+               Random rnd = new Random(874597969123412341L);
+
+               return new TestUserInterface[]{
+                               new TestUserClass1(rnd.nextInt(), "foo", 
rnd.nextLong()),
+                               new TestUserClass2(rnd.nextInt(), "bar", 
rnd.nextFloat())
+               };
+
+       }
+
+       @Override
+       @Test
+       public void testInstantiate() {
+               // don't do anything, since the PojoSerializer with subclass 
will return null
+       }
+
+       public interface TestUserInterface {}
+
+       // User code class for testing the serializer
+       public static class TestUserClassBase implements TestUserInterface {
+               public int dumm1;
+               public String dumm2;
+
+
+               public TestUserClassBase() {
+               }
+
+               public TestUserClassBase(int dumm1, String dumm2) {
+                       this.dumm1 = dumm1;
+                       this.dumm2 = dumm2;
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hashCode(dumm1, dumm2);
+               }
+
+               @Override
+               public boolean equals(Object other) {
+                       if (!(other instanceof TestUserClassBase)) {
+                               return false;
+                       }
+                       TestUserClassBase otherTUC = (TestUserClassBase) other;
+                       if (dumm1 != otherTUC.dumm1) {
+                               return false;
+                       }
+                       if (!dumm2.equals(otherTUC.dumm2)) {
+                               return false;
+                       }
+                       return true;
+               }
+       }
+
+       public static class TestUserClass1 extends TestUserClassBase {
+               public long dumm3;
+
+               public TestUserClass1() {
+               }
+
+               public TestUserClass1(int dumm1, String dumm2, long dumm3) {
+                       super(dumm1, dumm2);
+                       this.dumm3 = dumm3;
+               }
+
+               @Override
+               public boolean equals(Object other) {
+                       if (!(other instanceof TestUserClass1)) {
+                               return false;
+                       }
+                       TestUserClass1 otherTUC = (TestUserClass1) other;
+                       if (dumm1 != otherTUC.dumm1) {
+                               return false;
+                       }
+                       if (!dumm2.equals(otherTUC.dumm2)) {
+                               return false;
+                       }
+                       if (dumm3 != otherTUC.dumm3) {
+                               return false;
+                       }
+                       return true;
+               }
+       }
+
+       public static class TestUserClass2 extends TestUserClassBase {
+               public float dumm4;
+
+               public TestUserClass2() {
+               }
+
+               public TestUserClass2(int dumm1, String dumm2, float dumm4) {
+                       super(dumm1, dumm2);
+                       this.dumm4 = dumm4;
+               }
+
+               @Override
+               public boolean equals(Object other) {
+                       if (!(other instanceof TestUserClass2)) {
+                               return false;
+                       }
+                       TestUserClass2 otherTUC = (TestUserClass2) other;
+                       if (dumm1 != otherTUC.dumm1) {
+                               return false;
+                       }
+                       if (!dumm2.equals(otherTUC.dumm2)) {
+                               return false;
+                       }
+                       if (dumm4 != otherTUC.dumm4) {
+                               return false;
+                       }
+                       return true;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD2Test.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD2Test.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD2Test.java
index 9f1c7b8..cfc4914 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD2Test.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD2Test.java
@@ -26,8 +26,6 @@ import 
org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongComparator;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 
 import 
org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD3Test.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD3Test.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD3Test.java
index 8d13ab7..e5a0e6c 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD3Test.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD3Test.java
@@ -27,8 +27,6 @@ import 
org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongComparator;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 
 import 
org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDC3Test.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDC3Test.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDC3Test.java
index 82b8b4e..a1e6c40 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDC3Test.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDC3Test.java
@@ -27,8 +27,6 @@ import 
org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongComparator;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 
 import 
org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDX1Test.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDX1Test.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDX1Test.java
index 25de450..b5c0c1f 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDX1Test.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDX1Test.java
@@ -25,8 +25,6 @@ import 
org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongComparator;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 
 import 
org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDXC2Test.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDXC2Test.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDXC2Test.java
index 2a5fc29..793a2f4 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDXC2Test.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDXC2Test.java
@@ -26,8 +26,6 @@ import 
org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongComparator;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 
 import 
org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java
index b2218ad..8cdee9b 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD1Test.java
@@ -25,8 +25,6 @@ import 
org.apache.flink.api.common.typeutils.base.IntComparator;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 
 import 
org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java
index fe152a3..06c292f 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD2Test.java
@@ -26,8 +26,6 @@ import 
org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringComparator;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 
 import 
org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD3Test.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD3Test.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD3Test.java
index b92d825..d823a29 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD3Test.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorISD3Test.java
@@ -27,8 +27,6 @@ import 
org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringComparator;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 
 import 
org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java
index a701a82..96f8306 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java
@@ -22,6 +22,7 @@ package org.apache.flink.api.java.typeutils.runtime;
 import java.util.ArrayList;
 import java.util.Random;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple1;
@@ -209,7 +210,7 @@ public class TupleSerializerTest {
        private <T extends Tuple> void runTests(T... instances) {
                try {
                        TupleTypeInfo<T> tupleTypeInfo = (TupleTypeInfo<T>) 
TypeExtractor.getForObject(instances[0]);
-                       TypeSerializer<T> serializer = 
tupleTypeInfo.createSerializer();
+                       TypeSerializer<T> serializer = 
tupleTypeInfo.createSerializer(new ExecutionConfig());
                        
                        Class<T> tupleClass = tupleTypeInfo.getTypeClass();
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java
index e710187..cf9874d 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorTest.java
@@ -21,8 +21,6 @@ package org.apache.flink.api.java.typeutils.runtime;
 import org.apache.flink.api.common.typeutils.ComparatorTestBase;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.ValueComparator;
-import org.apache.flink.api.java.typeutils.runtime.ValueSerializer;
 import org.apache.flink.types.StringValue;
 
 public class ValueComparatorTest extends ComparatorTestBase<StringValue> {

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
index ce1147a..f5a90b7 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorTest.java
@@ -21,8 +21,6 @@ package org.apache.flink.api.java.typeutils.runtime;
 import org.apache.flink.api.common.typeutils.ComparatorTestBase;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.WritableComparator;
-import org.apache.flink.api.java.typeutils.runtime.WritableSerializer;
 
 public class WritableComparatorTest extends 
ComparatorTestBase<StringArrayWritable> {  
        

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
index 01349bf..557c0e4 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.SerializerTestInstance;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.WritableTypeInfo;
-import org.apache.flink.api.java.typeutils.runtime.WritableSerializer;
 import org.junit.Test;
 
 public class WritableSerializerTest {
@@ -41,7 +41,7 @@ public class WritableSerializerTest {
                };
                
                WritableTypeInfo<StringArrayWritable> writableTypeInfo = 
(WritableTypeInfo<StringArrayWritable>) TypeExtractor.getForObject(data[0]);
-               WritableSerializer<StringArrayWritable> writableSerializer = 
(WritableSerializer<StringArrayWritable>) writableTypeInfo.createSerializer();
+               WritableSerializer<StringArrayWritable> writableSerializer = 
(WritableSerializer<StringArrayWritable>) writableTypeInfo.createSerializer(new 
ExecutionConfig());
                
                SerializerTestInstance<StringArrayWritable> testInstance = new 
SerializerTestInstance<StringArrayWritable>(writableSerializer,writableTypeInfo.getTypeClass(),
 -1, data);
                

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
 
b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
index 49a3fe5..6edd2f7 100644
--- 
a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
+++ 
b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
@@ -29,7 +29,6 @@ import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.MapPartitionFunction;

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
index 4f10683..d213069 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.iterative.task;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.aggregators.Aggregator;
@@ -165,7 +166,7 @@ public abstract class AbstractIterativePactTask<S extends 
Function, OT> extends
        public DistributedRuntimeUDFContext createRuntimeContext(String 
taskName) {
                Environment env = getEnvironment();
                return new IterativeRuntimeUdfContext(taskName, 
env.getNumberOfSubtasks(),
-                               env.getIndexInSubtaskGroup(), 
getUserCodeClassLoader());
+                               env.getIndexInSubtaskGroup(), 
getUserCodeClassLoader(), getExecutionConfig());
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -355,8 +356,8 @@ public abstract class AbstractIterativePactTask<S extends 
Function, OT> extends
 
        private class IterativeRuntimeUdfContext extends 
DistributedRuntimeUDFContext implements IterationRuntimeContext {
 
-               public IterativeRuntimeUdfContext(String name, int 
numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader) {
-                       super(name, numParallelSubtasks, subtaskIndex, 
userCodeClassLoader);
+               public IterativeRuntimeUdfContext(String name, int 
numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, 
ExecutionConfig executionConfig) {
+                       super(name, numParallelSubtasks, subtaskIndex, 
userCodeClassLoader, executionConfig);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
index ff1b272..42bbe3c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
@@ -18,14 +18,21 @@
 
 package org.apache.flink.runtime.jobgraph.tasks;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.util.InstantiationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Abstract base class for every task class in Flink.
  */
 public abstract class AbstractInvokable {
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(AbstractInvokable.class);
+
+
        /**
         * The environment assigned to this invokable.
         */
@@ -111,6 +118,26 @@ public abstract class AbstractInvokable {
        }
 
        /**
+        * Returns the global ExecutionConfig.
+        */
+       public ExecutionConfig getExecutionConfig() {
+               try {
+                       ExecutionConfig c = (ExecutionConfig) 
InstantiationUtil.readObjectFromConfig(
+                                       getJobConfiguration(),
+                                       ExecutionConfig.CONFIG_KEY,
+                                       this.getClass().getClassLoader());
+                       if (c != null) {
+                               return c;
+                       } else {
+                               return new ExecutionConfig();
+                       }
+               } catch (Exception e) {
+                       LOG.warn("Could not load ExecutionConfig from 
Environment, returning default ExecutionConfig: {}", e);
+                       return new ExecutionConfig();
+               }
+       }
+
+       /**
         * This method is called when a task is canceled either as a result of 
a user abort or an execution failure. It can
         * be overwritten to respond to shut down the user code properly.
         * 

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index 330f438..01ab533 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -74,24 +74,6 @@ public class DataSourceTask<OT> extends AbstractInvokable {
        // cancel flag
        private volatile boolean taskCanceled = false;
 
-       private ExecutionConfig getExecutionConfig() {
-               ExecutionConfig executionConfig = new ExecutionConfig();
-               try {
-                       ExecutionConfig c = (ExecutionConfig) 
InstantiationUtil.readObjectFromConfig(
-                                       getJobConfiguration(),
-                                       ExecutionConfig.CONFIG_KEY,
-                                       this.getClass().getClassLoader());
-                       if (c != null) {
-                               executionConfig = c;
-                       }
-               } catch (IOException e) {
-                       throw new RuntimeException("Could not load 
ExecutionConfig from Job Configuration: " + e);
-               } catch (ClassNotFoundException e) {
-                       throw new RuntimeException("Could not load 
ExecutionConfig from Job Configuration: " + e);
-               }
-               return executionConfig;
-       }
-
        @Override
        public void registerInputOutput() {
                initInputFormat();

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
index f6d2f4a..ffe09cb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
@@ -45,8 +45,6 @@ public interface PactTaskContext<S, OT> {
        
        TaskConfig getTaskConfig();
 
-       ExecutionConfig getExecutionConfig();
-
        ClassLoader getUserCodeClassLoader();
        
        MemoryManager getMemoryManager();
@@ -60,7 +58,9 @@ public interface PactTaskContext<S, OT> {
        <X> TypeComparator<X> getDriverComparator(int index);
        
        S getStub();
-       
+
+       ExecutionConfig getExecutionConfig();
+
        Collector<OT> getOutputCollector();
        
        AbstractInvokable getOwningNepheleTask();

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 6b6918d..9118400 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -1068,7 +1068,7 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
        public DistributedRuntimeUDFContext createRuntimeContext(String 
taskName) {
                Environment env = getEnvironment();
                return new DistributedRuntimeUDFContext(taskName, 
env.getNumberOfSubtasks(),
-                               env.getIndexInSubtaskGroup(), 
getUserCodeClassLoader(), env.getCopyTask());
+                               env.getIndexInSubtaskGroup(), 
getUserCodeClassLoader(), getExecutionConfig(), env.getCopyTask());
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -1081,23 +1081,6 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
        }
 
        @Override
-       public ExecutionConfig getExecutionConfig() {
-               try {
-                       ExecutionConfig c = (ExecutionConfig) 
InstantiationUtil.readObjectFromConfig(
-                                       
getOwningNepheleTask().getJobConfiguration(),
-                                       ExecutionConfig.CONFIG_KEY,
-                                       this.getClass().getClassLoader());
-                       if (c != null) {
-                               return c;
-                       } else {
-                               return new ExecutionConfig();
-                       }
-               } catch (Exception e) {
-                       throw new RuntimeException("Could not load 
ExecutionConfig from Job Configuration: " + e);
-               }
-       }
-
-       @Override
        public MemoryManager getMemoryManager() {
                return getEnvironment().getMemoryManager();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
index 838286e..c84d888 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
@@ -62,7 +62,7 @@ public abstract class ChainedDriver<IT, OT> implements 
Collector<IT> {
                } else {
                        Environment env = parent.getEnvironment();
                        this.udfContext = new 
DistributedRuntimeUDFContext(taskName, env.getNumberOfSubtasks(),
-                                       env.getIndexInSubtaskGroup(), 
userCodeClassLoader, env.getCopyTask());
+                                       env.getIndexInSubtaskGroup(), 
userCodeClassLoader, parent.getExecutionConfig(), env.getCopyTask());
                }
 
                this.executionConfig = executionConfig;

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
index 0080d63..ce63021 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -102,6 +103,8 @@ public class LargeRecordHandler<T> {
        
        private volatile boolean closed;
 
+       private final ExecutionConfig executionConfig;
+
        // 
--------------------------------------------------------------------------------------------
        
        public LargeRecordHandler(TypeSerializer<T> serializer, 
TypeComparator<T> comparator, 
@@ -115,7 +118,9 @@ public class LargeRecordHandler<T> {
                this.memory = checkNotNull(memory);
                this.memoryOwner = checkNotNull(memoryOwner);
                this.maxFilehandles = maxFilehandles;
-               
+
+               this.executionConfig = memoryOwner.getExecutionConfig();
+
                checkArgument(maxFilehandles >= 2);
        }
        
@@ -374,13 +379,13 @@ public class LargeRecordHandler<T> {
        
        // 
--------------------------------------------------------------------------------------------
        
-       private static TypeSerializer<Object> createSerializer(Object key, int 
pos) {
+       private TypeSerializer<Object> createSerializer(Object key, int pos) {
                if (key == null) {
                        throw new NullKeyFieldException(pos);
                }
                try {
                        TypeInformation<Object> info = 
TypeExtractor.getForObject(key);
-                       return info.createSerializer();
+                       return info.createSerializer(executionConfig);
                }
                catch (Throwable t) {
                        throw new RuntimeException("Could not create key 
serializer for type " + key);

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
index aa7aec6..e841066 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.FutureTask;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext;
@@ -40,12 +41,12 @@ public class DistributedRuntimeUDFContext extends 
AbstractRuntimeUDFContext {
        private final HashMap<String, BroadcastVariableMaterialization<?, ?>> 
broadcastVars = new HashMap<String, BroadcastVariableMaterialization<?, ?>>();
        
        
-       public DistributedRuntimeUDFContext(String name, int 
numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader) {
-               super(name, numParallelSubtasks, subtaskIndex, 
userCodeClassLoader);
+       public DistributedRuntimeUDFContext(String name, int 
numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, 
ExecutionConfig executionConfig) {
+               super(name, numParallelSubtasks, subtaskIndex, 
userCodeClassLoader, executionConfig);
        }
        
-       public DistributedRuntimeUDFContext(String name, int 
numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, 
Map<String, FutureTask<Path>> cpTasks) {
-               super(name, numParallelSubtasks, subtaskIndex, 
userCodeClassLoader, cpTasks);
+       public DistributedRuntimeUDFContext(String name, int 
numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, 
ExecutionConfig executionConfig, Map<String, FutureTask<Path>> cpTasks) {
+               super(name, numParallelSubtasks, subtaskIndex, 
userCodeClassLoader, executionConfig, cpTasks);
        }
        
 

Reply via email to