http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
index 5163801..56e2680 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.typeinfo;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import 
org.apache.flink.api.common.typeutils.base.array.BooleanPrimitiveArraySerializer;
@@ -86,7 +87,7 @@ public class PrimitiveArrayTypeInfo<T> extends 
TypeInformation<T> {
        }
 
        @Override
-       public TypeSerializer<T> createSerializer() {
+       public TypeSerializer<T> createSerializer(ExecutionConfig 
executionConfig) {
                return this.serializer;
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
index 0f86486..0ed5afd 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
@@ -18,10 +18,13 @@
 
 package org.apache.flink.api.common.typeinfo;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
-public abstract class TypeInformation<T> {
-       
+import java.io.Serializable;
+
+public abstract class TypeInformation<T> implements Serializable {
+
        public abstract boolean isBasicType();
        
        public abstract boolean isTupleType();
@@ -32,7 +35,7 @@ public abstract class TypeInformation<T> {
        
        public abstract boolean isKeyType();
        
-       public abstract TypeSerializer<T> createSerializer();
+       public abstract TypeSerializer<T> createSerializer(ExecutionConfig 
config);
        
        /**
         * @return The number of fields in this type, including its sub-fields 
(for compsite types) 

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
index c3ea0e4..f3e4cfa 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.typeutils;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 
@@ -91,7 +92,7 @@ public abstract class CompositeType<T> extends 
TypeInformation<T> {
        /**
         * Get the actual comparator we've initialized.
         */
-       protected abstract TypeComparator<T> getNewComparator();
+       protected abstract TypeComparator<T> getNewComparator(ExecutionConfig 
config);
        
        
        /**
@@ -99,7 +100,7 @@ public abstract class CompositeType<T> extends 
TypeInformation<T> {
         * to create the actual comparators
         * @return The comparator
         */
-       public TypeComparator<T> createComparator(int[] logicalKeyFields, 
boolean[] orders, int logicalFieldOffset) {
+       public TypeComparator<T> createComparator(int[] logicalKeyFields, 
boolean[] orders, int logicalFieldOffset, ExecutionConfig config) {
                initializeNewComparator(logicalKeyFields.length);
                
                for(int logicalKeyFieldIndex = 0; logicalKeyFieldIndex < 
logicalKeyFields.length; logicalKeyFieldIndex++) {
@@ -110,13 +111,13 @@ public abstract class CompositeType<T> extends 
TypeInformation<T> {
                                
                                if(localFieldType instanceof AtomicType && 
logicalField == logicalKeyField) {
                                        // we found an atomic key --> create 
comparator
-                                       addCompareField(localFieldId, 
((AtomicType<?>) localFieldType).createComparator(orders[logicalKeyFieldIndex]) 
);
+                                       addCompareField(localFieldId, 
((AtomicType<?>) localFieldType).createComparator(orders[logicalKeyFieldIndex], 
config) );
                                } else if(localFieldType instanceof 
CompositeType  && // must be a composite type
                                                ( logicalField <= 
logicalKeyField //check if keyField can be at or behind the current logicalField
                                                && logicalKeyField <= 
logicalField + (localFieldType.getTotalFields() - 1) ) // check if logical 
field + lookahead could contain our key
                                                ) {
                                        // we found a compositeType that is 
containing the logicalKeyField we are looking for --> create comparator
-                                       addCompareField(localFieldId, 
((CompositeType<?>) localFieldType).createComparator(new int[] 
{logicalKeyField}, new boolean[] {orders[logicalKeyFieldIndex]}, logicalField));
+                                       addCompareField(localFieldId, 
((CompositeType<?>) localFieldType).createComparator(new int[] 
{logicalKeyField}, new boolean[] {orders[logicalKeyFieldIndex]}, logicalField, 
config));
                                }
                                
                                // maintain logicalField
@@ -127,7 +128,7 @@ public abstract class CompositeType<T> extends 
TypeInformation<T> {
                                logicalField++;
                        }
                }
-               return getNewComparator();
+               return getNewComparator(config);
        }
        
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
index 7d701dc..c77a1b6 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.junit.Test;
 
@@ -33,7 +34,7 @@ public class RuntimeUDFContextTest {
        @Test
        public void testBroadcastVariableNotFound() {
                try {
-                       RuntimeUDFContext ctx = new RuntimeUDFContext("test 
name", 3, 1, getClass().getClassLoader());
+                       RuntimeUDFContext ctx = new RuntimeUDFContext("test 
name", 3, 1, getClass().getClassLoader(), new ExecutionConfig());
                        
                        try {
                                ctx.getBroadcastVariable("some name");
@@ -63,7 +64,7 @@ public class RuntimeUDFContextTest {
        @Test
        public void testBroadcastVariableSimple() {
                try {
-                       RuntimeUDFContext ctx = new RuntimeUDFContext("test 
name", 3, 1, getClass().getClassLoader());
+                       RuntimeUDFContext ctx = new RuntimeUDFContext("test 
name", 3, 1, getClass().getClassLoader(), new ExecutionConfig());
                        
                        ctx.setBroadcastVariable("name1", Arrays.asList(1, 2, 
3, 4));
                        ctx.setBroadcastVariable("name2", Arrays.asList(1.0, 
2.0, 3.0, 4.0));
@@ -97,7 +98,7 @@ public class RuntimeUDFContextTest {
        @Test
        public void testBroadcastVariableWithInitializer() {
                try {
-                       RuntimeUDFContext ctx = new RuntimeUDFContext("test 
name", 3, 1, getClass().getClassLoader());
+                       RuntimeUDFContext ctx = new RuntimeUDFContext("test 
name", 3, 1, getClass().getClassLoader(), new ExecutionConfig());
                        
                        ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 
4));
                        
@@ -122,7 +123,7 @@ public class RuntimeUDFContextTest {
        @Test
        public void testResetBroadcastVariableWithInitializer() {
                try {
-                       RuntimeUDFContext ctx = new RuntimeUDFContext("test 
name", 3, 1, getClass().getClassLoader());
+                       RuntimeUDFContext ctx = new RuntimeUDFContext("test 
name", 3, 1, getClass().getClassLoader(), new ExecutionConfig());
                        
                        ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 
4));
                        
@@ -145,7 +146,7 @@ public class RuntimeUDFContextTest {
        @Test
        public void testBroadcastVariableWithInitializerAndMismatch() {
                try {
-                       RuntimeUDFContext ctx = new RuntimeUDFContext("test 
name", 3, 1, getClass().getClassLoader());
+                       RuntimeUDFContext ctx = new RuntimeUDFContext("test 
name", 3, 1, getClass().getClassLoader(), new ExecutionConfig());
                        
                        ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 
4));
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
index 9a0a2b5..d231455 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.operators.base;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
@@ -63,9 +64,15 @@ public class FlatMapOperatorCollectionTest implements 
Serializable {
        }
 
        private void testExecuteOnCollection(FlatMapFunction<String, String> 
udf, List<String> input, boolean mutableSafe) throws Exception {
+               ExecutionConfig executionConfig = new ExecutionConfig();
+               if (mutableSafe) {
+                       executionConfig.disableObjectReuse();
+               } else {
+                       executionConfig.enableObjectReuse();
+               }
                // run on collections
                final List<String> result = getTestFlatMapOperator(udf)
-                               .executeOnCollections(input, new 
RuntimeUDFContext("Test UDF", 4, 0, null), mutableSafe);
+                               .executeOnCollections(input, new 
RuntimeUDFContext("Test UDF", 4, 0, null, executionConfig), executionConfig);
 
                Assert.assertEquals(input.size(), result.size());
                Assert.assertEquals(input, result);

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
index feb2223..54975b4 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.operators.base;
 
 import static org.junit.Assert.*;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.RichFlatJoinFunction;
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
@@ -60,8 +61,11 @@ public class JoinOperatorBaseTest implements Serializable {
                List<Integer> expected = new 
ArrayList<Integer>(Arrays.asList(3, 3, 6 ,6));
 
                try {
-                       List<Integer> resultSafe = 
base.executeOnCollections(inputData1, inputData2, null, true);
-                       List<Integer> resultRegular = 
base.executeOnCollections(inputData1, inputData2, null, false);
+                       ExecutionConfig executionConfig = new ExecutionConfig();
+                       executionConfig.disableObjectReuse();
+                       List<Integer> resultSafe = 
base.executeOnCollections(inputData1, inputData2, null, executionConfig);
+                       executionConfig.enableObjectReuse();
+                       List<Integer> resultRegular = 
base.executeOnCollections(inputData1, inputData2, null, executionConfig);
 
                        assertEquals(expected, resultSafe);
                        assertEquals(expected, resultRegular);
@@ -110,8 +114,11 @@ public class JoinOperatorBaseTest implements Serializable {
 
 
                try {
-                       List<Integer> resultSafe = 
base.executeOnCollections(inputData1, inputData2, new 
RuntimeUDFContext(taskName, 1, 0, null), true);
-                       List<Integer> resultRegular = 
base.executeOnCollections(inputData1, inputData2, new 
RuntimeUDFContext(taskName, 1, 0, null), false);
+                       ExecutionConfig executionConfig = new ExecutionConfig();
+                       executionConfig.disableObjectReuse();
+                       List<Integer> resultSafe = 
base.executeOnCollections(inputData1, inputData2, new 
RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig);
+                       executionConfig.enableObjectReuse();
+                       List<Integer> resultRegular = 
base.executeOnCollections(inputData1, inputData2, new 
RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig);
 
                        assertEquals(expected, resultSafe);
                        assertEquals(expected, resultRegular);

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
index fd23d40..8e07f07 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
@@ -52,8 +53,12 @@ public class MapOperatorTest implements java.io.Serializable 
{
                                        parser, new 
UnaryOperatorInformation<String, Integer>(BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO), "TestMapper");
                        
                        List<String> input = new ArrayList<String>(asList("1", 
"2", "3", "4", "5", "6"));
-                       List<Integer> resultMutableSafe = 
op.executeOnCollections(input, null, true);
-                       List<Integer> resultRegular = 
op.executeOnCollections(input, null, false);
+
+                       ExecutionConfig executionConfig = new ExecutionConfig();
+                       executionConfig.disableObjectReuse();
+                       List<Integer> resultMutableSafe = 
op.executeOnCollections(input, null, executionConfig);
+                       executionConfig.enableObjectReuse();
+                       List<Integer> resultRegular = 
op.executeOnCollections(input, null, executionConfig);
                        
                        assertEquals(asList(1, 2, 3, 4, 5, 6), 
resultMutableSafe);
                        assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular);
@@ -97,8 +102,11 @@ public class MapOperatorTest implements 
java.io.Serializable {
                                        parser, new 
UnaryOperatorInformation<String, Integer>(BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO), taskName);
                        
                        List<String> input = new ArrayList<String>(asList("1", 
"2", "3", "4", "5", "6"));
-                       List<Integer> resultMutableSafe = 
op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), 
true);
-                       List<Integer> resultRegular = 
op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), 
false);
+                       ExecutionConfig executionConfig = new ExecutionConfig();
+                       executionConfig.disableObjectReuse();
+                       List<Integer> resultMutableSafe = 
op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, 
executionConfig), executionConfig);
+                       executionConfig.enableObjectReuse();
+                       List<Integer> resultRegular = 
op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, 
executionConfig), executionConfig);
                        
                        assertEquals(asList(1, 2, 3, 4, 5, 6), 
resultMutableSafe);
                        assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular);

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
index 50c6b98..61ba359 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.util.Collector;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
 import org.apache.flink.api.common.functions.RichMapPartitionFunction;
@@ -74,9 +75,12 @@ public class PartitionMapOperatorTest implements 
java.io.Serializable {
                                        parser, new 
UnaryOperatorInformation<String, Integer>(BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO), taskName);
                        
                        List<String> input = new ArrayList<String>(asList("1", 
"2", "3", "4", "5", "6"));
-                       
-                       List<Integer> resultMutableSafe = 
op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), 
true);
-                       List<Integer> resultRegular = 
op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null), 
false);
+
+                       ExecutionConfig executionConfig = new ExecutionConfig();
+                       executionConfig.disableObjectReuse();
+                       List<Integer> resultMutableSafe = 
op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, 
executionConfig), executionConfig);
+                       executionConfig.enableObjectReuse();
+                       List<Integer> resultRegular = 
op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, 
executionConfig), executionConfig);
                        
                        assertEquals(asList(1, 2, 3, 4, 5, 6), 
resultMutableSafe);
                        assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular);

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index 1531ae6..59bea0c 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -358,7 +358,7 @@ public abstract class SerializerTestBase<T> {
                        try {
                                ser2 = SerializationUtils.clone(ser1);
                        } catch (SerializationException e) {
-                               fail("The serializer is not serializable.");
+                               fail("The serializer is not serializable: " + 
e);
                                return;
                        }
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
index 2d57490..c61d624 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
@@ -29,7 +29,7 @@ public class CollectionEnvironment extends 
ExecutionEnvironment {
                Plan p = createProgramPlan(jobName);
 
                // We need to reverse here. Object-Reuse enabled, means safe 
mode is disabled.
-               CollectionExecutor exec = new 
CollectionExecutor(!getConfig().isObjectReuseEnabled());
+               CollectionExecutor exec = new CollectionExecutor(getConfig());
                return exec.execute(p);
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index e5bb1fd..f2091e2 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -69,8 +69,10 @@ import org.apache.flink.api.java.operators.UnionOperator;
 import org.apache.flink.api.java.operators.UnsortedGrouping;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
 import org.apache.flink.api.java.typeutils.MissingTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
@@ -92,7 +94,7 @@ import com.google.common.base.Preconditions;
  */
 public abstract class DataSet<T> {
        
-       private final ExecutionEnvironment context;
+       protected final ExecutionEnvironment context;
        
        // NOTE: the type must not be accessed directly, but only via getType()
        private TypeInformation<T> type;
@@ -109,7 +111,11 @@ public abstract class DataSet<T> {
                }
 
                this.context = context;
-               this.type = typeInfo;
+               if (typeInfo instanceof PojoTypeInfo && 
context.getConfig().isForceKryoEnabled()) {
+                       this.type = new 
GenericTypeInfo<T>(typeInfo.getTypeClass());
+               } else {
+                       this.type = typeInfo;
+               }
        }
 
        /**
@@ -1300,7 +1306,7 @@ public abstract class DataSet<T> {
                
                // configure the type if needed
                if (outputFormat instanceof InputTypeConfigurable) {
-                       ((InputTypeConfigurable) 
outputFormat).setInputType(getType());
+                       ((InputTypeConfigurable) 
outputFormat).setInputType(getType(), context.getConfig() );
                }
                
                DataSink<T> sink = new DataSink<T>(this, outputFormat, 
getType());

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index af8095c..1105ab9 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -54,10 +54,10 @@ import org.apache.flink.api.java.operators.Operator;
 import org.apache.flink.api.java.operators.OperatorTranslation;
 import org.apache.flink.api.java.operators.translation.JavaPlan;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.ValueTypeInfo;
-import org.apache.flink.api.java.typeutils.runtime.KryoSerializer;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.NumberSequenceIterator;
@@ -103,8 +103,7 @@ public abstract class ExecutionEnvironment {
        private final List<Tuple2<String, DistributedCacheEntry>> cacheFile = 
new ArrayList<Tuple2<String, DistributedCacheEntry>>();
 
        private ExecutionConfig config = new ExecutionConfig();
-       
-       
+
        // 
--------------------------------------------------------------------------------------------
        //  Constructor and Properties
        // 
--------------------------------------------------------------------------------------------
@@ -117,14 +116,6 @@ public abstract class ExecutionEnvironment {
        }
 
        /**
-        * Sets the config object.
-        */
-       public void setConfig(ExecutionConfig config) {
-               Validate.notNull(config);
-               this.config = config;
-       }
-
-       /**
         * Gets the config object.
         */
        public ExecutionConfig getConfig() {
@@ -208,7 +199,7 @@ public abstract class ExecutionEnvironment {
        // 
--------------------------------------------------------------------------------------------
        //  Registry for types and serializers
        // 
--------------------------------------------------------------------------------------------
-       
+
        /**
         * Registers the given Serializer as a default serializer for the given 
type at the
         * {@link org.apache.flink.api.java.typeutils.runtime.KryoSerializer}.
@@ -220,11 +211,7 @@ public abstract class ExecutionEnvironment {
         * @param serializer The serializer to use.
         */
        public void registerKryoSerializer(Class<?> type, Serializer<?> 
serializer) {
-               if (type == null || serializer == null) {
-                       throw new NullPointerException("Cannot register null 
class or serializer.");
-               }
-               
-               KryoSerializer.registerSerializer(type, serializer);
+               config.registerKryoSerializer(type, serializer);
        }
 
        /**
@@ -235,11 +222,7 @@ public abstract class ExecutionEnvironment {
         * @param serializerClass The class of the serializer to use.
         */
        public void registerKryoSerializer(Class<?> type, Class<? extends 
Serializer<?>> serializerClass) {
-               if (type == null || serializerClass == null) {
-                       throw new NullPointerException("Cannot register null 
class or serializer.");
-               }
-               
-               KryoSerializer.registerSerializer(type, serializerClass);
+               config.registerKryoSerializer(type, serializerClass);
        }
        
        /**
@@ -254,10 +237,16 @@ public abstract class ExecutionEnvironment {
                if (type == null) {
                        throw new NullPointerException("Cannot register null 
type class.");
                }
-               
-               KryoSerializer.registerType(type);
+
+               TypeInformation<?> typeInfo = 
TypeExtractor.createTypeInfo(type);
+
+               if (typeInfo instanceof PojoTypeInfo) {
+                       config.registerPojoType(type);
+               } else {
+                       config.registerKryoType(type);
+               }
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
        //  Data set creations
        // 
--------------------------------------------------------------------------------------------
@@ -555,7 +544,7 @@ public abstract class ExecutionEnvironment {
                
                TypeInformation<X> type = 
TypeExtractor.getForObject(firstValue);
                CollectionInputFormat.checkCollection(data, 
type.getTypeClass());
-               return new DataSource<X>(this, new 
CollectionInputFormat<X>(data, type.createSerializer()), type, 
Utils.getCallLocationName());
+               return new DataSource<X>(this, new 
CollectionInputFormat<X>(data, type.createSerializer(config)), type, 
Utils.getCallLocationName());
        }
        
        /**
@@ -582,7 +571,7 @@ public abstract class ExecutionEnvironment {
        private <X> DataSource<X> fromCollection(Collection<X> data, 
TypeInformation<X> type, String callLocationName) {
                CollectionInputFormat.checkCollection(data, 
type.getTypeClass());
                
-               return new DataSource<X>(this, new 
CollectionInputFormat<X>(data, type.createSerializer()), type, 
callLocationName);
+               return new DataSource<X>(this, new 
CollectionInputFormat<X>(data, type.createSerializer(config)), type, 
callLocationName);
        }
        
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java 
b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
index 8e0ff5e..9239e25 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.io.Writer;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.InvalidProgramException;
@@ -216,7 +217,7 @@ public class CsvOutputFormat<T extends Tuple> extends 
FileOutputFormat<T> implem
         * is in fact a tuple type.
         */
        @Override
-       public void setInputType(TypeInformation<?> type) {
+       public void setInputType(TypeInformation<?> type, ExecutionConfig 
executionConfig) {
                if (!type.isTupleType()) {
                        throw new InvalidProgramException("The " + 
CsvOutputFormat.class.getSimpleName() +
                                " can only be used to write tuple data sets.");

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/io/LocalCollectionOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/LocalCollectionOutputFormat.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/io/LocalCollectionOutputFormat.java
index 4b2e0ed..23c19f7 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/io/LocalCollectionOutputFormat.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/io/LocalCollectionOutputFormat.java
@@ -25,6 +25,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -89,7 +90,7 @@ public class LocalCollectionOutputFormat<T> implements 
OutputFormat<T>, InputTyp
 
        @Override
        @SuppressWarnings("unchecked")
-       public void setInputType(TypeInformation<?> type) {
-               this.typeSerializer = 
(TypeSerializer<T>)type.createSerializer();
+       public void setInputType(TypeInformation<?> type, ExecutionConfig 
executionConfig) {
+               this.typeSerializer = 
(TypeSerializer<T>)type.createSerializer(executionConfig);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java
index 8e92c27..7d0dfaa 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerInputFormat.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.io;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.BinaryInputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -40,7 +41,8 @@ public class TypeSerializerInputFormat<T> extends 
BinaryInputFormat<T> implement
 
        public TypeSerializerInputFormat(TypeInformation<T> resultType) {
                this.resultType = resultType;
-               this.serializer = resultType.createSerializer();
+               // TODO: fix this shit
+               this.serializer = resultType.createSerializer(new 
ExecutionConfig());
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java
index a9b93ee..0c9ed80 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.io;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.BinaryOutputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -52,7 +53,7 @@ public class TypeSerializerOutputFormat<T> extends 
BinaryOutputFormat<T> impleme
 
        @Override
        @SuppressWarnings("unchecked")
-       public void setInputType(TypeInformation<?> type) {
-               serializer = (TypeSerializer<T>) type.createSerializer();
+       public void setInputType(TypeInformation<?> type, ExecutionConfig 
executionConfig) {
+               serializer = (TypeSerializer<T>) 
type.createSerializer(executionConfig);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
index 9ea28f7..acfb47a 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
@@ -226,7 +226,7 @@ public class CrossOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OUT,
                                TupleTypeInfo<OUT> returnType, CrossHint hint)
                {
                        super(input1, input2,
-                                       new ProjectCrossFunction<I1, I2, 
OUT>(fields, isFromFirst, returnType.createSerializer().createInstance()),
+                                       new ProjectCrossFunction<I1, I2, 
OUT>(fields, isFromFirst, 
returnType.createSerializer(input1.getExecutionEnvironment().getConfig()).createInstance()),
                                        returnType, hint, "unknown");
                        
                        crossProjection = null;
@@ -236,7 +236,7 @@ public class CrossOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OUT,
                                TupleTypeInfo<OUT> returnType, 
CrossProjection<I1, I2> crossProjection, CrossHint hint)
                {
                        super(input1, input2,
-                               new ProjectCrossFunction<I1, I2, OUT>(fields, 
isFromFirst, returnType.createSerializer().createInstance()),
+                               new ProjectCrossFunction<I1, I2, OUT>(fields, 
isFromFirst, 
returnType.createSerializer(input1.getExecutionEnvironment().getConfig()).createInstance()),
                                returnType, hint, "unknown");
                        
                        this.crossProjection = crossProjection;

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index e60314f..8b61779 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -633,7 +633,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
                
                protected ProjectJoin(DataSet<I1> input1, DataSet<I2> input2, 
Keys<I1> keys1, Keys<I2> keys2, JoinHint hint, int[] fields, boolean[] 
isFromFirst, TupleTypeInfo<OUT> returnType) {
                        super(input1, input2, keys1, keys2, 
-                                       new ProjectFlatJoinFunction<I1, I2, 
OUT>(fields, isFromFirst, returnType.createSerializer().createInstance()),
+                                       new ProjectFlatJoinFunction<I1, I2, 
OUT>(fields, isFromFirst, 
returnType.createSerializer(input1.getExecutionEnvironment().getConfig()).createInstance()),
                                        returnType, hint, 
Utils.getCallLocationName(4)); // We need to use the 4th element in the stack 
because the call comes through .types().
 
                        
@@ -642,7 +642,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
                
                protected ProjectJoin(DataSet<I1> input1, DataSet<I2> input2, 
Keys<I1> keys1, Keys<I2> keys2, JoinHint hint, int[] fields, boolean[] 
isFromFirst, TupleTypeInfo<OUT> returnType, JoinProjection<I1, I2> joinProj) {
                        super(input1, input2, keys1, keys2, 
-                                       new ProjectFlatJoinFunction<I1, I2, 
OUT>(fields, isFromFirst, returnType.createSerializer().createInstance()),
+                                       new ProjectFlatJoinFunction<I1, I2, 
OUT>(fields, isFromFirst, 
returnType.createSerializer(input1.getExecutionEnvironment().getConfig()).createInstance()),
                                        returnType, hint, 
Utils.getCallLocationName(4));
                        
                        this.joinProj = joinProj;

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
index bddef8f..16d9ff3 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
@@ -49,7 +49,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
        protected final int[] fields;
        
        private Projection<IN> proj;
-       
+
        public ProjectOperator(DataSet<IN> input, int[] fields, 
TupleTypeInfo<OUT> returnType) {
                super(input, returnType);
        
@@ -68,7 +68,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
        protected 
org.apache.flink.api.common.operators.base.MapOperatorBase<IN, OUT, 
MapFunction<IN,OUT>> translateToDataFlow(Operator<IN> input) {
                String name = getName() != null ? getName() : "Projection " + 
Arrays.toString(fields);
                // create operator
-               PlanProjectOperator<IN, OUT> ppo = new PlanProjectOperator<IN, 
OUT>(fields, name, getInputType(), getResultType());
+               PlanProjectOperator<IN, OUT> ppo = new PlanProjectOperator<IN, 
OUT>(fields, name, getInputType(), getResultType(), context.getConfig());
                // set input
                ppo.setInput(input);
                // set dop

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
index 1b452da..959b929 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.operators.translation;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
@@ -27,8 +28,8 @@ import org.apache.flink.api.java.tuple.Tuple;
 
 public class PlanProjectOperator<T, R extends Tuple> extends 
MapOperatorBase<T, R, MapFunction<T, R>> {
 
-       public PlanProjectOperator(int[] fields, String name, 
TypeInformation<T> inType, TypeInformation<R> outType) {
-               super(new MapProjector<T, R>(fields, 
outType.createSerializer().createInstance()), new UnaryOperatorInformation<T, 
R>(inType, outType), name);
+       public PlanProjectOperator(int[] fields, String name, 
TypeInformation<T> inType, TypeInformation<R> outType, ExecutionConfig 
executionConfig) {
+               super(new MapProjector<T, R>(fields, 
outType.createSerializer(executionConfig).createInstance()), new 
UnaryOperatorInformation<T, R>(inType, outType), name);
        }
        
        public static final class MapProjector<T, R extends Tuple>

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
index c2a0b7f..291acce 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.typeutils;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -40,7 +41,7 @@ public class EnumTypeInfo<T extends Enum<T>> extends 
TypeInformation<T> implemen
        }
 
        @Override
-       public TypeComparator<T> createComparator(boolean sortOrderAscending) {
+       public TypeComparator<T> createComparator(boolean sortOrderAscending, 
ExecutionConfig executionConfig) {
                return new EnumComparator<T>(sortOrderAscending);
        }
 
@@ -75,7 +76,7 @@ public class EnumTypeInfo<T extends Enum<T>> extends 
TypeInformation<T> implemen
        }
 
        @Override
-       public TypeSerializer<T> createSerializer() {
+       public TypeSerializer<T> createSerializer(ExecutionConfig 
executionConfig) {
                return new EnumSerializer<T>(typeClass);
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
index 5bc6cb9..cb0ac31 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.typeutils;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -25,7 +26,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator;
 import org.apache.flink.api.java.typeutils.runtime.KryoSerializer;
 
-
 public class GenericTypeInfo<T> extends TypeInformation<T> implements 
AtomicType<T> {
 
        private final Class<T> typeClass;
@@ -33,7 +33,7 @@ public class GenericTypeInfo<T> extends TypeInformation<T> 
implements AtomicType
        public GenericTypeInfo(Class<T> typeClass) {
                this.typeClass = typeClass;
        }
-       
+
        @Override
        public boolean isBasicType() {
                return false;
@@ -65,16 +65,16 @@ public class GenericTypeInfo<T> extends TypeInformation<T> 
implements AtomicType
        }
 
        @Override
-       public TypeSerializer<T> createSerializer() {
-               return new KryoSerializer<T>(this.typeClass);
+       public TypeSerializer<T> createSerializer(ExecutionConfig config) {
+               return new KryoSerializer<T>(this.typeClass, config);
        }
 
        @SuppressWarnings("unchecked")
        @Override
-       public TypeComparator<T> createComparator(boolean sortOrderAscending) {
+       public TypeComparator<T> createComparator(boolean sortOrderAscending, 
ExecutionConfig executionConfig) {
                if (isKeyType()) {
                        @SuppressWarnings("rawtypes")
-                       GenericTypeComparator comparator = new 
GenericTypeComparator(sortOrderAscending, createSerializer(), this.typeClass);
+                       GenericTypeComparator comparator = new 
GenericTypeComparator(sortOrderAscending, createSerializer(executionConfig), 
this.typeClass);
                        return (TypeComparator<T>) comparator;
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java
index e26326e..f8b4247 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/InputTypeConfigurable.java
@@ -19,11 +19,13 @@
 package org.apache.flink.api.java.typeutils;
 
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 
 /**
  * {@link org.apache.flink.api.common.io.OutputFormat}s can implement this 
interface to be configured
- * with the data type they will operate on. The method {@link 
#setInputType(TypeInformation)} will be
+ * with the data type they will operate on. The method {@link 
#setInputType(org.apache.flink.api
+ * .common.typeinfo.TypeInformation, 
org.apache.flink.api.common.ExecutionConfig)} will be
  * called when the output format is used with an output method such as
  * {@link 
org.apache.flink.api.java.DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
  */
@@ -32,8 +34,9 @@ public interface InputTypeConfigurable {
        /**
         * Method that is called on an {@link 
org.apache.flink.api.common.io.OutputFormat} when it is passed to
         * the DataSet's output method. May be used to configures the output 
format based on the data type.
-        * 
+        *
         * @param type The data type of the input.
+        * @param executionConfig
         */
-       void setInputType(TypeInformation<?> type);
+       void setInputType(TypeInformation<?> type, ExecutionConfig 
executionConfig);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java
index 10ab02f..f0da264 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.typeutils;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -79,7 +80,7 @@ public class MissingTypeInfo extends 
TypeInformation<InvalidTypesException> {
        }
 
        @Override
-       public TypeSerializer<InvalidTypesException> createSerializer() {
+       public TypeSerializer<InvalidTypesException> 
createSerializer(ExecutionConfig executionConfig) {
                throw new UnsupportedOperationException("The missing type 
information cannot be used as a type information.");
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
index 664350e..b4bc86b 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
@@ -22,6 +22,7 @@ import java.lang.reflect.GenericArrayType;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -106,15 +107,15 @@ public class ObjectArrayTypeInfo<T, C> extends 
TypeInformation<T> {
 
        @SuppressWarnings("unchecked")
        @Override
-       public TypeSerializer<T> createSerializer() {
+       public TypeSerializer<T> createSerializer(ExecutionConfig 
executionConfig) {
                // use raw type for serializer if generic array type
                if (this.componentType instanceof GenericArrayType) {
                        ParameterizedType paramType = (ParameterizedType) 
((GenericArrayType) this.componentType).getGenericComponentType();
 
                        return (TypeSerializer<T>) new 
GenericArraySerializer<C>((Class<C>) paramType.getRawType(),
-                                       this.componentInfo.createSerializer());
+                                       
this.componentInfo.createSerializer(executionConfig));
                } else {
-                       return (TypeSerializer<T>) new 
GenericArraySerializer<C>((Class<C>) this.componentType, 
this.componentInfo.createSerializer());
+                       return (TypeSerializer<T>) new 
GenericArraySerializer<C>((Class<C>) this.componentType, 
this.componentInfo.createSerializer(executionConfig));
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
index fb5ca44..503b6f2 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -28,13 +28,14 @@ import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
 import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
 import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
+import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
 
 import com.google.common.base.Joiner;
 
@@ -43,7 +44,7 @@ import com.google.common.base.Joiner;
  * TypeInformation for arbitrary (they have to be java-beans-style) java 
objects (what we call POJO).
  * 
  */
-public class PojoTypeInfo<T> extends CompositeType<T>{
+public class PojoTypeInfo<T> extends CompositeType<T> {
 
        private final static String REGEX_FIELD = 
"[\\p{L}_\\$][\\p{L}\\p{Digit}_\\$]*";
        private final static String REGEX_NESTED_FIELDS = 
"("+REGEX_FIELD+")(\\.(.+))?";
@@ -269,7 +270,7 @@ public class PojoTypeInfo<T> extends CompositeType<T>{
        }
 
        @Override
-       protected TypeComparator<T> getNewComparator() {
+       protected TypeComparator<T> getNewComparator(ExecutionConfig config) {
                // first remove the null array fields
                final Field[] finalKeyFields = Arrays.copyOf(keyFields, 
comparatorHelperIndex);
                @SuppressWarnings("rawtypes")
@@ -277,21 +278,21 @@ public class PojoTypeInfo<T> extends CompositeType<T>{
                if(finalFieldComparators.length == 0 || finalKeyFields.length 
== 0 ||  finalFieldComparators.length != finalKeyFields.length) {
                        throw new IllegalArgumentException("Pojo comparator 
creation has a bug");
                }
-               return new PojoComparator<T>(finalKeyFields, 
finalFieldComparators, createSerializer(), typeClass);
+               return new PojoComparator<T>(finalKeyFields, 
finalFieldComparators, createSerializer(config), typeClass);
        }
 
 
        @Override
-       public TypeSerializer<T> createSerializer() {
+       public TypeSerializer<T> createSerializer(ExecutionConfig config) {
                TypeSerializer<?>[] fieldSerializers = new 
TypeSerializer<?>[fields.length ];
                Field[] reflectiveFields = new Field[fields.length];
 
                for (int i = 0; i < fields.length; i++) {
-                       fieldSerializers[i] = fields[i].type.createSerializer();
+                       fieldSerializers[i] = 
fields[i].type.createSerializer(config);
                        reflectiveFields[i] = fields[i].field;
                }
 
-               return new PojoSerializer<T>(this.typeClass, fieldSerializers, 
reflectiveFields);
+               return new PojoSerializer<T>(this.typeClass, fieldSerializers, 
reflectiveFields, config);
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -312,7 +313,7 @@ public class PojoTypeInfo<T> extends CompositeType<T>{
                for (PojoField field : fields) {
                        fieldStrings.add(field.field.getName() + ": " + 
field.type.toString());
                }
-               return "PojoType<" + typeClass.getCanonicalName()
+               return "PojoType<" + typeClass.getName()
                                + ", fields = [" + Joiner.on(", 
").join(fieldStrings) + "]"
                                + ">";
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java
index 2464f25..60fa110 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.typeutils;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.record.RecordSerializer;
@@ -60,7 +61,7 @@ public class RecordTypeInfo extends TypeInformation<Record> {
        }
 
        @Override
-       public TypeSerializer<Record> createSerializer() {
+       public TypeSerializer<Record> createSerializer(ExecutionConfig config) {
                return RecordSerializer.get();
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
index af258e2..c1e573d 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
@@ -20,11 +20,12 @@ package org.apache.flink.api.java.typeutils;
 
 import java.util.Arrays;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
 import org.apache.flink.api.java.tuple.*;
 //CHECKSTYLE.ON: AvoidStarImport
 import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
@@ -48,10 +49,10 @@ public final class TupleTypeInfo<T extends Tuple> extends 
TupleTypeInfoBase<T> {
        }
 
        @Override
-       public TupleSerializer<T> createSerializer() {
+       public TupleSerializer<T> createSerializer(ExecutionConfig 
executionConfig) {
                TypeSerializer<?>[] fieldSerializers = new 
TypeSerializer<?>[getArity()];
                for (int i = 0; i < types.length; i++) {
-                       fieldSerializers[i] = types[i].createSerializer();
+                       fieldSerializers[i] = 
types[i].createSerializer(executionConfig);
                }
                
                Class<T> tupleClass = getTypeClass();
@@ -81,7 +82,7 @@ public final class TupleTypeInfo<T extends Tuple> extends 
TupleTypeInfoBase<T> {
        }
 
        @Override
-       protected TypeComparator<T> getNewComparator() {
+       protected TypeComparator<T> getNewComparator(ExecutionConfig 
executionConfig) {
                @SuppressWarnings("rawtypes")
                final TypeComparator[] finalFieldComparators = 
Arrays.copyOf(fieldComparators, comparatorHelperIndex);
                final int[] finalLogicalKeyFields = 
Arrays.copyOf(logicalKeyFields, comparatorHelperIndex);
@@ -93,7 +94,7 @@ public final class TupleTypeInfo<T extends Tuple> extends 
TupleTypeInfoBase<T> {
                }
                TypeSerializer<?>[] fieldSerializers = new 
TypeSerializer<?>[maxKey + 1];
                for (int i = 0; i <= maxKey; i++) {
-                       fieldSerializers[i] = types[i].createSerializer();
+                       fieldSerializers[i] = 
types[i].createSerializer(executionConfig);
                }
                if(finalFieldComparators.length == 0 || 
finalLogicalKeyFields.length == 0 || fieldSerializers.length == 0 
                                || finalFieldComparators.length != 
finalLogicalKeyFields.length) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index c99a80f..1b3003f 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -1255,6 +1255,7 @@ public class TypeExtractor {
        @SuppressWarnings("unchecked")
        private <OUT, IN1, IN2> TypeInformation<OUT> analyzePojo(Class<OUT> 
clazz, ArrayList<Type> typeHierarchy,
                        ParameterizedType parameterizedType, 
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
+
                // add the hierarchy of the POJO itself if it is generic
                if (parameterizedType != null) {
                        getTypeHierarchy(typeHierarchy, parameterizedType, 
Object.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
index 79a2760..1486ee5 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.typeutils;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -79,7 +80,7 @@ public class ValueTypeInfo<T extends Value> extends 
TypeInformation<T> implement
 
        @Override
        @SuppressWarnings("unchecked")
-       public TypeSerializer<T> createSerializer() {
+       public TypeSerializer<T> createSerializer(ExecutionConfig 
executionConfig) {
                if (CopyableValue.class.isAssignableFrom(type)) {
                        return (TypeSerializer<T>) 
createCopyableValueSerializer(type.asSubclass(CopyableValue.class));
                }
@@ -90,7 +91,7 @@ public class ValueTypeInfo<T extends Value> extends 
TypeInformation<T> implement
        
        @SuppressWarnings({ "unchecked", "rawtypes" })
        @Override
-       public TypeComparator<T> createComparator(boolean sortOrderAscending) {
+       public TypeComparator<T> createComparator(boolean sortOrderAscending, 
ExecutionConfig executionConfig) {
                if (!isKeyType()) {
                        throw new RuntimeException("The type " + type.getName() 
+ " is not Comparable.");
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
index 195ce25..89d7818 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.typeutils;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -43,7 +44,7 @@ public class WritableTypeInfo<T extends Writable> extends 
TypeInformation<T> imp
 
        @SuppressWarnings({ "rawtypes", "unchecked" })
        @Override
-       public TypeComparator<T> createComparator(boolean sortOrderAscending) {
+       public TypeComparator<T> createComparator(boolean sortOrderAscending, 
ExecutionConfig executionConfig) {
                if(Comparable.class.isAssignableFrom(typeClass)) {
                        return new WritableComparator(sortOrderAscending, 
typeClass);
                }
@@ -84,7 +85,7 @@ public class WritableTypeInfo<T extends Writable> extends 
TypeInformation<T> imp
        }
 
        @Override
-       public TypeSerializer<T> createSerializer() {
+       public TypeSerializer<T> createSerializer(ExecutionConfig 
executionConfig) {
                return new WritableSerializer<T>(typeClass);
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
index c55cd71..133dd57 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
@@ -27,6 +27,7 @@ import com.esotericsoftware.kryo.io.Output;
 import com.esotericsoftware.kryo.serializers.JavaSerializer;
 import com.twitter.chill.ScalaKryoInstantiator;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -36,9 +37,6 @@ import java.io.ByteArrayOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.lang.reflect.Modifier;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
@@ -55,13 +53,8 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
        
        private static final long serialVersionUID = 3L;
 
-       private static Map<Class<?>, Serializer<?>> staticRegisteredSerializers 
= new HashMap<Class<?>, Serializer<?>>();
-       private static Map<Class<?>, Class<? extends Serializer<?>>> 
staticRegisteredSerializersClasses = new HashMap<Class<?>, Class<? extends 
Serializer<?>>>();
-       
-       private static Set<Class<?>> staticRegisteredTypes = new 
HashSet<Class<?>>();
-       
        // 
------------------------------------------------------------------------
-       
+
        private final Map<Class<?>, Serializer<?>> registeredSerializers;
        private final Map<Class<?>, Class<? extends Serializer<?>>> 
registeredSerializersClasses;
        private final Set<Class<?>> registeredTypes;
@@ -82,29 +75,15 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
        
        // 
------------------------------------------------------------------------
 
-       public KryoSerializer(Class<T> type){
+       public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){
                if(type == null){
                        throw new NullPointerException("Type class cannot be 
null.");
                }
                this.type = type;
 
-               // create copies of the statically registered serializers
-               // we use static synchronization to safeguard against 
concurrent use
-               // of the static collections.
-               synchronized (KryoSerializer.class) {
-                       this.registeredSerializers = 
staticRegisteredSerializers.isEmpty() ?
-                               Collections.<Class<?>, Serializer<?>>emptyMap() 
:
-                               new HashMap<Class<?>, 
Serializer<?>>(staticRegisteredSerializers);
-               
-                       this.registeredSerializersClasses = 
staticRegisteredSerializersClasses.isEmpty() ?
-                               Collections.<Class<?>, Class<? extends 
Serializer<?>>>emptyMap() :
-                               new HashMap<Class<?>, Class<? extends 
Serializer<?>>>(staticRegisteredSerializersClasses);
-                               
-                       this.registeredTypes = staticRegisteredTypes.isEmpty() ?
-                               Collections.<Class<?>>emptySet() :
-                               new HashSet<Class<?>>(staticRegisteredTypes);
-               }
-               
+               this.registeredSerializers = 
executionConfig.getRegisteredKryoSerializers();
+               this.registeredSerializersClasses = 
executionConfig.getRegisteredKryoSerializersClasses();
+               this.registeredTypes = executionConfig.getRegisteredKryoTypes();
        }
 
        /**
@@ -289,69 +268,6 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
                        
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
                }
        }
-       
-       // 
--------------------------------------------------------------------------------------------
-       // For registering custom serializers and types
-       // 
--------------------------------------------------------------------------------------------
-
-       /**
-        * Registers the given Serializer as a default serializer for the given 
class at the Kryo
-        * instance.
-        * Note that the serializer instance must be serializable (as defined 
by java.io.Serializable),
-        * because it may be distributed to the worker nodes by java 
serialization.
-        * 
-        * @param clazz The class of the types serialized with the given 
serializer.
-        * @param serializer The serializer to use.
-        * @throws IllegalArgumentException Thrown, if the serializer is not 
serializable.
-        */
-       public static void registerSerializer(Class<?> clazz, Serializer<?> 
serializer) {
-               if (clazz == null || serializer == null) {
-                       throw new NullPointerException("Cannot register null 
class or serializer.");
-               }
-               if (!(serializer instanceof java.io.Serializable)) {
-                       throw new IllegalArgumentException("The serializer 
instance must be serializable, (for distributing it in the cluster), "
-                                       + "as defined by java.io.Serializable. 
For stateless serializers, you can use the "
-                                       + "'registerSerializer(Class, Class)' 
method to register the serializer via its class.");
-               }
-               
-               synchronized (KryoSerializer.class) {
-                       staticRegisteredSerializers.put(clazz, serializer);
-               }
-       }
-
-       /**
-        * Registers a serializer via its class as a default serializer for the 
given class at the Kryo
-        * instance.
-        * 
-        * @param clazz The class of the types serialized with the given 
serializer.
-        * @param serializerClass The serializer to use.
-        */
-       public static void registerSerializer(Class<?> clazz, Class<? extends 
Serializer<?>> serializerClass) {
-               if (clazz == null || serializerClass == null) {
-                       throw new NullPointerException("Cannot register null 
class or serializer.");
-               }
-               
-               synchronized (KryoSerializer.class) {
-                       staticRegisteredSerializersClasses.put(clazz, 
serializerClass);
-               }
-       }
-       
-       /**
-        * Registers the given type with Kryo. Registering the type allows Kryo 
to write abbreviated
-        * name tags, rather than full class names, thereby vastly increasing 
the serialization
-        * performance in many cases.
-        *  
-        * @param type The class of the type to register.
-        */
-       public static void registerType(Class<?> type) {
-               if (type == null) {
-                       throw new NullPointerException("Cannot register null 
type class.");
-               }
-               
-               synchronized (KryoSerializer.class) {
-                       staticRegisteredTypes.add(type);
-               }
-       }
 
        // 
--------------------------------------------------------------------------------------------
        // For testing

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index 15e8537..6033094 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -22,36 +22,96 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 
 public final class PojoSerializer<T> extends TypeSerializer<T> {
 
+       // Flags for the header
+       private static byte IS_NULL = 1;
+       private static byte NO_SUBCLASS = 2;
+       private static byte IS_SUBCLASS = 4;
+       private static byte IS_TAGGED_SUBCLASS = 8;
+
        private static final long serialVersionUID = 1L;
 
        private final Class<T> clazz;
 
-       private final TypeSerializer<Object>[] fieldSerializers;
+       private TypeSerializer<Object>[] fieldSerializers;
 
        // We need to handle these ourselves in writeObject()/readObject()
        private transient Field[] fields;
 
-       private final int numFields;
+       private int numFields;
+
+       private transient Map<Class<?>, TypeSerializer> subclassSerializerCache;
+       private transient ClassLoader cl;
+
+       private Map<Class<?>, Integer> registeredClasses;
+
+       private TypeSerializer[] registeredSerializers;
+
+       private final ExecutionConfig executionConfig;
 
        @SuppressWarnings("unchecked")
-       public PojoSerializer(Class<T> clazz, TypeSerializer<?>[] 
fieldSerializers, Field[] fields) {
+       public PojoSerializer(
+                       Class<T> clazz,
+                       TypeSerializer<?>[] fieldSerializers,
+                       Field[] fields,
+                       ExecutionConfig executionConfig) {
                this.clazz = clazz;
                this.fieldSerializers = (TypeSerializer<Object>[]) 
fieldSerializers;
                this.fields = fields;
                this.numFields = fieldSerializers.length;
+               this.executionConfig = executionConfig;
+
+               Set<Class<?>> registeredPojoTypes = 
executionConfig.getRegisteredPojoTypes();
 
                for (int i = 0; i < numFields; i++) {
                        this.fields[i].setAccessible(true);
                }
+
+               cl = Thread.currentThread().getContextClassLoader();
+
+               subclassSerializerCache = new HashMap<Class<?>, 
TypeSerializer>();
+
+               // We only want those classes that are not our own class and 
are actually sub-classes.
+               List<Class<?>> cleanedTaggedClasses = new 
ArrayList<Class<?>>(registeredPojoTypes.size());
+               for (Class<?> registeredClass: registeredPojoTypes) {
+                       if (registeredClass.equals(clazz)) {
+                               continue;
+                       }
+                       if (!clazz.isAssignableFrom(registeredClass)) {
+                               continue;
+                       }
+                       cleanedTaggedClasses.add(registeredClass);
+
+               }
+               this.registeredClasses = new LinkedHashMap<Class<?>, 
Integer>(cleanedTaggedClasses.size());
+               registeredSerializers = new 
TypeSerializer[cleanedTaggedClasses.size()];
+
+               int id = 0;
+               for (Class<?> registeredClass: cleanedTaggedClasses) {
+                       this.registeredClasses.put(registeredClass, id);
+                       TypeInformation<?> typeInfo = 
TypeExtractor.createTypeInfo(registeredClass);
+                       registeredSerializers[id] = 
typeInfo.createSerializer(executionConfig);
+
+                       id++;
+               }
        }
 
        private void writeObject(ObjectOutputStream out)
@@ -67,9 +127,9 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
        private void readObject(ObjectInputStream in)
                        throws IOException, ClassNotFoundException {
                in.defaultReadObject();
-               int numKeyFields = in.readInt();
-               fields = new Field[numKeyFields];
-               for (int i = 0; i < numKeyFields; i++) {
+               int numFields = in.readInt();
+               fields = new Field[numFields];
+               for (int i = 0; i < numFields; i++) {
                        Class<?> clazz = (Class<?>)in.readObject();
                        String fieldName = in.readUTF();
                        fields[i] = null;
@@ -88,8 +148,42 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                                                + " (" + fieldName + ")");
                        }
                }
+
+               cl = Thread.currentThread().getContextClassLoader();
+               subclassSerializerCache = new HashMap<Class<?>, 
TypeSerializer>();
+       }
+
+       private TypeSerializer getSubclassSerializer(Class<?> subclass) {
+               TypeSerializer<?> result = 
subclassSerializerCache.get(subclass);
+               if (result == null) {
+
+                       TypeInformation<?> typeInfo = 
TypeExtractor.createTypeInfo(subclass);
+                       result = typeInfo.createSerializer(executionConfig);
+                       if (result instanceof PojoSerializer) {
+                               PojoSerializer<?> subclassSerializer = 
(PojoSerializer<?>) result;
+                               subclassSerializer.copyBaseFieldOrder(this);
+                       }
+                       subclassSerializerCache.put(subclass, result);
+
+               }
+               return result;
+       }
+
+       private boolean hasField(Field f) {
+               for (Field field: fields) {
+                       if (f.equals(field)) {
+                               return true;
+                       }
+               }
+               return false;
+       }
+
+       @SuppressWarnings("unchecked")
+       private void copyBaseFieldOrder(PojoSerializer<?> baseSerializer) {
+               // do nothing for now, but in the future, adapt subclass 
serializer to have same
+               // ordering as base class serializer so that binary comparison 
on base class fields
+               // can work
        }
-       
        
        @Override
        public boolean isImmutableType() {
@@ -110,7 +204,7 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                }
 
                if (stateful) {
-                       return new PojoSerializer<T>(clazz, 
duplicateFieldSerializers, fields);
+                       return new PojoSerializer<T>(clazz, 
duplicateFieldSerializers, fields, executionConfig);
                } else {
                        return this;
                }
@@ -119,13 +213,12 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
        
        @Override
        public T createInstance() {
+               if (clazz.isInterface() || 
Modifier.isAbstract(clazz.getModifiers())) {
+                       return null;
+               }
                try {
                        T t = clazz.newInstance();
-               
-                       for (int i = 0; i < numFields; i++) {
-                               fields[i].set(t, 
fieldSerializers[i].createInstance());
-                       }
-                       
+                       initializeFields(t);
                        return t;
                }
                catch (Exception e) {
@@ -133,51 +226,89 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                }
        }
 
+       protected void initializeFields(T t) {
+               for (int i = 0; i < numFields; i++) {
+                       try {
+                               fields[i].set(t, 
fieldSerializers[i].createInstance());
+                       } catch (IllegalAccessException e) {
+                               throw new RuntimeException("Cannot initialize 
fields.", e);
+                       }
+               }
+       }
+
        @Override
+       @SuppressWarnings("unchecked")
        public T copy(T from) {
-               T target;
-               try {
-                       target = clazz.newInstance();
-               }
-               catch (Throwable t) {
-                       throw new RuntimeException("Cannot instantiate class.", 
t);
+               if (from == null) {
+                       return null;
                }
-               
-               try {
-                       for (int i = 0; i < numFields; i++) {
-                               Object value = fields[i].get(from);
-                               if (value != null) {
-                                       Object copy = 
fieldSerializers[i].copy(value);
-                                       fields[i].set(target, copy);
-                               }
-                               else {
-                                       fields[i].set(target, null);
+
+               Class<?> actualType = from.getClass();
+               if (actualType == clazz) {
+                       T target;
+                       try {
+                               target = (T) from.getClass().newInstance();
+                       }
+                       catch (Throwable t) {
+                               throw new RuntimeException("Cannot instantiate 
class.", t);
+                       }
+                       // no subclass
+                       try {
+                               for (int i = 0; i < numFields; i++) {
+                                       Object value = fields[i].get(from);
+                                       if (value != null) {
+                                               Object copy = 
fieldSerializers[i].copy(value);
+                                               fields[i].set(target, copy);
+                                       }
+                                       else {
+                                               fields[i].set(target, null);
+                                       }
                                }
+                       } catch (IllegalAccessException e) {
+                               throw new RuntimeException("Error during POJO 
copy, this should not happen since we check the fields before.");
+
                        }
+                       return target;
+               } else {
+                       // subclass
+                       TypeSerializer subclassSerializer = 
getSubclassSerializer(actualType);
+                       return (T) subclassSerializer.copy(from);
                }
-               catch (IllegalAccessException e) {
-                       throw new RuntimeException("Error during POJO copy, 
this should not happen since we check the fields before.");
-               }
-               return target;
        }
        
        @Override
+       @SuppressWarnings("unchecked")
        public T copy(T from, T reuse) {
-               try {
-                       for (int i = 0; i < numFields; i++) {
-                               Object value = fields[i].get(from);
-                               if (value != null) {
-                                       Object copy = 
fieldSerializers[i].copy(fields[i].get(from), fields[i].get(reuse));
-                                       fields[i].set(reuse, copy);
-                               }
-                               else {
-                                       fields[i].set(reuse, null);
+               if (from == null) {
+                       return null;
+               }
+
+               Class<?> actualType = from.getClass();
+               if (reuse == null || actualType != reuse.getClass()) {
+                       // cannot reuse, do a non-reuse copy
+                       return copy(from);
+               }
+
+               if (actualType == clazz) {
+                       try {
+                               for (int i = 0; i < numFields; i++) {
+                                       Object value = fields[i].get(from);
+                                       if (value != null) {
+                                               Object copy = 
fieldSerializers[i].copy(fields[i].get(from), fields[i].get(reuse));
+                                               fields[i].set(reuse, copy);
+                                       }
+                                       else {
+                                               fields[i].set(reuse, null);
+                                       }
                                }
+                       } catch (IllegalAccessException e) {
+                               throw new RuntimeException("Error during POJO 
copy, this should not happen since we check the fields" + "before.");
                        }
-               } catch (IllegalAccessException e) {
-                       throw new RuntimeException("Error during POJO copy, 
this should not happen since we check the fields" +
-                                       "before.");
+               } else {
+                       TypeSerializer subclassSerializer = 
getSubclassSerializer(actualType);
+                       reuse = (T) subclassSerializer.copy(from, reuse);
                }
+
                return reuse;
        }
 
@@ -188,94 +319,228 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
 
 
        @Override
+       @SuppressWarnings("unchecked")
        public void serialize(T value, DataOutputView target) throws 
IOException {
+               int flags = 0;
                // handle null values
                if (value == null) {
-                       target.writeBoolean(true);
+                       flags |= IS_NULL;
+                       target.writeByte(flags);
                        return;
+               }
+
+               Integer subclassTag = -1;
+               Class<?> actualClass = value.getClass();
+               TypeSerializer subclassSerializer = null;
+               if (clazz != actualClass) {
+                       subclassTag = registeredClasses.get(actualClass);
+                       if (subclassTag != null) {
+                               flags |= IS_TAGGED_SUBCLASS;
+                               subclassSerializer = 
registeredSerializers[subclassTag];
+                       } else {
+                               flags |= IS_SUBCLASS;
+                               subclassSerializer = 
getSubclassSerializer(actualClass);
+                       }
                } else {
-                       target.writeBoolean(false);
+                       flags |= NO_SUBCLASS;
                }
-               try {
-                       for (int i = 0; i < numFields; i++) {
-                               Object o = fields[i].get(value);
-                               if(o == null) {
-                                       target.writeBoolean(true); // null 
field handling
-                               } else {
-                                       target.writeBoolean(false);
-                                       fieldSerializers[i].serialize(o, 
target);
+
+               target.writeByte(flags);
+
+               if ((flags & IS_SUBCLASS) != 0) {
+                       target.writeUTF(actualClass.getName());
+               } else if ((flags & IS_TAGGED_SUBCLASS) != 0) {
+                       target.writeByte(subclassTag);
+               }
+
+
+               if ((flags & NO_SUBCLASS) != 0) {
+                       try {
+                               for (int i = 0; i < numFields; i++) {
+                                       Object o = fields[i].get(value);
+                                       if (o == null) {
+                                               target.writeBoolean(true); // 
null field handling
+                                       } else {
+                                               target.writeBoolean(false);
+                                               
fieldSerializers[i].serialize(o, target);
+                                       }
                                }
+                       } catch (IllegalAccessException e) {
+                               throw new RuntimeException("Error during POJO 
copy, this should not happen since we check the fields" + "before.");
+
+                       }
+               } else {
+                       // subclass
+                       if (subclassSerializer != null) {
+                               subclassSerializer.serialize(value, target);
                        }
-               } catch (IllegalAccessException e) {
-                       throw new RuntimeException("Error during POJO copy, 
this should not happen since we check the fields" +
-                                       "before.");
                }
        }
 
        @Override
+       @SuppressWarnings("unchecked")
        public T deserialize(DataInputView source) throws IOException {
-               boolean isNull = source.readBoolean();
-               if(isNull) {
+               int flags = source.readByte();
+               if((flags & IS_NULL) != 0) {
                        return null;
                }
+
                T target;
-               try {
-                       target = clazz.newInstance();
-               }
-               catch (Throwable t) {
-                       throw new RuntimeException("Cannot instantiate class.", 
t);
+
+               Class<?> actualSubclass = null;
+               TypeSerializer subclassSerializer = null;
+
+               if ((flags & IS_SUBCLASS) != 0) {
+                       String subclassName = source.readUTF();
+                       try {
+                               actualSubclass = Class.forName(subclassName, 
true, cl);
+                       } catch (ClassNotFoundException e) {
+                               throw new RuntimeException("Cannot instantiate 
class.", e);
+                       }
+                       subclassSerializer = 
getSubclassSerializer(actualSubclass);
+                       target = (T) subclassSerializer.createInstance();
+                       // also initialize fields for which the subclass 
serializer is not responsible
+                       initializeFields(target);
+               } else if ((flags & IS_TAGGED_SUBCLASS) != 0) {
+
+                       int subclassTag = source.readByte();
+                       subclassSerializer = registeredSerializers[subclassTag];
+                       target = (T) subclassSerializer.createInstance();
+                       // also initialize fields for which the subclass 
serializer is not responsible
+                       initializeFields(target);
+               } else {
+                       target = createInstance();
                }
-               
-               try {
-                       for (int i = 0; i < numFields; i++) {
-                               isNull = source.readBoolean();
-                               if(isNull) {
-                                       fields[i].set(target, null);
-                               } else {
-                                       Object field = 
fieldSerializers[i].deserialize(source);
-                                       fields[i].set(target, field);
+
+               if ((flags & NO_SUBCLASS) != 0) {
+                       try {
+                               for (int i = 0; i < numFields; i++) {
+                                       boolean isNull = source.readBoolean();
+                                       if (isNull) {
+                                               fields[i].set(target, null);
+                                       } else {
+                                               Object field = 
fieldSerializers[i].deserialize(source);
+                                               fields[i].set(target, field);
+                                       }
                                }
+                       } catch (IllegalAccessException e) {
+                               throw new RuntimeException("Error during POJO 
copy, this should not happen since we check the fields" + "before.");
+
+                       }
+               } else {
+                       if (subclassSerializer != null) {
+                               target = (T) 
subclassSerializer.deserialize(target, source);
                        }
-               } catch (IllegalAccessException e) {
-                       throw new RuntimeException("Error during POJO copy, 
this should not happen since we check the fields" +
-                                       "before.");
                }
                return target;
        }
        
        @Override
+       @SuppressWarnings("unchecked")
        public T deserialize(T reuse, DataInputView source) throws IOException {
+
                // handle null values
-               boolean isNull = source.readBoolean();
-               if (isNull) {
+               int flags = source.readByte();
+               if((flags & IS_NULL) != 0) {
                        return null;
                }
-               try {
-                       for (int i = 0; i < numFields; i++) {
-                               isNull = source.readBoolean();
-                               if(isNull) {
-                                       fields[i].set(reuse, null);
-                               } else {
-                                       Object field = 
fieldSerializers[i].deserialize(fields[i].get(reuse), source);
-                                       fields[i].set(reuse, field);
+
+               Class<?> subclass = null;
+               TypeSerializer subclassSerializer = null;
+               if ((flags & IS_SUBCLASS) != 0) {
+                       String subclassName = source.readUTF();
+                       try {
+                               subclass = Class.forName(subclassName, true, 
cl);
+                       } catch (ClassNotFoundException e) {
+                               throw new RuntimeException("Cannot instantiate 
class.", e);
+                       }
+                       subclassSerializer = getSubclassSerializer(subclass);
+
+                       if (reuse == null || subclass != reuse.getClass()) {
+                               // cannot reuse
+                               reuse = (T) subclassSerializer.createInstance();
+                               // also initialize fields for which the 
subclass serializer is not responsible
+                               initializeFields(reuse);
+                       }
+               } else if ((flags & IS_TAGGED_SUBCLASS) != 0) {
+                       int subclassTag = source.readByte();
+                       subclassSerializer = registeredSerializers[subclassTag];
+
+                       if (reuse == null || 
((PojoSerializer)subclassSerializer).clazz != reuse.getClass()) {
+                               // cannot reuse
+                               reuse = (T) subclassSerializer.createInstance();
+                               // also initialize fields for which the 
subclass serializer is not responsible
+                               initializeFields(reuse);
+                       }
+               } else {
+                       if (reuse == null || clazz != reuse.getClass()) {
+                               reuse = createInstance();
+                       }
+               }
+
+               if ((flags & NO_SUBCLASS) != 0) {
+                       try {
+                               for (int i = 0; i < numFields; i++) {
+                                       boolean isNull = source.readBoolean();
+                                       if (isNull) {
+                                               fields[i].set(reuse, null);
+                                       } else {
+                                               Object field = 
fieldSerializers[i].deserialize(fields[i].get(reuse), source);
+
+                                               fields[i].set(reuse, field);
+                                       }
                                }
+                       } catch (IllegalAccessException e) {
+                               throw new RuntimeException("Error during POJO 
copy, this should not happen since we check the fields" + "before.");
+                       }
+               } else {
+                       if (subclassSerializer != null) {
+                               reuse = (T) 
subclassSerializer.deserialize(reuse, source);
                        }
-               } catch (IllegalAccessException e) {
-                       throw new RuntimeException("Error during POJO copy, 
this should not happen since we check the fields" +
-                                       "before.");
                }
+
                return reuse;
        }
 
        @Override
        public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-               // copy the Non-Null/Null tag
-               target.writeBoolean(source.readBoolean());
-               for (int i = 0; i < numFields; i++) {
-                       boolean isNull = source.readBoolean();
-                       target.writeBoolean(isNull);
-                       if (!isNull) {
-                               fieldSerializers[i].copy(source, target);
+               // copy the flags
+               int flags = source.readByte();
+               target.writeByte(flags);
+
+               if ((flags & IS_NULL) != 0) {
+                       // is a null value, nothing further to copy
+                       return;
+               }
+
+               TypeSerializer<?> subclassSerializer = null;
+               if ((flags & IS_SUBCLASS) != 0) {
+                       String className = source.readUTF();
+                       target.writeUTF(className);
+                       try {
+                               Class<?> subclass = Class.forName(className, 
true, Thread.currentThread()
+                                               .getContextClassLoader());
+                               subclassSerializer = 
getSubclassSerializer(subclass);
+                       } catch (ClassNotFoundException e) {
+                               throw new RuntimeException("Cannot instantiate 
class.", e);
+                       }
+               } else if ((flags & IS_TAGGED_SUBCLASS) != 0) {
+                       int subclassTag = source.readByte();
+                       target.writeByte(subclassTag);
+                       subclassSerializer = registeredSerializers[subclassTag];
+               }
+
+               if ((flags & NO_SUBCLASS) != 0) {
+                       for (int i = 0; i < numFields; i++) {
+                               boolean isNull = source.readBoolean();
+                               target.writeBoolean(isNull);
+                               if (!isNull) {
+                                       fieldSerializers[i].copy(source, 
target);
+                               }
+                       }
+               } else {
+                       if (subclassSerializer != null) {
+                               subclassSerializer.copy(source, target);
                        }
                }
        }

Reply via email to