[FLINK-1369] [types] Add support for Subclasses, Interfaces, Abstract Classes.

- Abstract classes with fields are handled as POJO types.
- Interfaces and abstract classes without fields are handled as generic types.

This closes #236
This closes #316


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7407076d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7407076d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7407076d

Branch: refs/heads/master
Commit: 7407076d3990752eb5fa4072cd036efd2f656cbc
Parents: 6b402f4
Author: Aljoscha Krettek <[email protected]>
Authored: Wed Nov 26 13:27:06 2014 +0100
Committer: Aljoscha Krettek <[email protected]>
Committed: Tue Feb 10 13:15:03 2015 +0100

----------------------------------------------------------------------
 .../compiler/postpass/JavaApiPostPass.java      |  15 +-
 .../flink/compiler/util/NoOpBinaryUdfOp.java    |   3 +-
 .../flink/compiler/util/NoOpUnaryUdfOp.java     |   3 +-
 flink-core/pom.xml                              |  48 +-
 .../flink/api/common/ExecutionConfig.java       | 160 ++++++-
 .../api/common/functions/RuntimeContext.java    |   7 +
 .../util/AbstractRuntimeUDFContext.java         |  17 +-
 .../functions/util/RuntimeUDFContext.java       |   9 +-
 .../common/operators/CollectionExecutor.java    |  32 +-
 .../api/common/operators/DualInputOperator.java |   3 +-
 .../common/operators/GenericDataSinkBase.java   |   7 +-
 .../common/operators/GenericDataSourceBase.java |   5 +-
 .../common/operators/SingleInputOperator.java   |   3 +-
 .../flink/api/common/operators/Union.java       |   3 +-
 .../operators/base/BulkIterationBase.java       |   3 +-
 .../operators/base/CoGroupOperatorBase.java     |  27 +-
 .../base/CollectorMapOperatorBase.java          |   3 +-
 .../operators/base/CrossOperatorBase.java       |  13 +-
 .../operators/base/DeltaIterationBase.java      |   3 +-
 .../operators/base/FilterOperatorBase.java      |   3 +-
 .../operators/base/FlatMapOperatorBase.java     |  11 +-
 .../operators/base/GroupReduceOperatorBase.java |  23 +-
 .../common/operators/base/JoinOperatorBase.java |  21 +-
 .../common/operators/base/MapOperatorBase.java  |  11 +-
 .../base/MapPartitionOperatorBase.java          |  11 +-
 .../operators/base/PartitionOperatorBase.java   |   3 +-
 .../operators/base/ReduceOperatorBase.java      |  14 +-
 .../flink/api/common/typeinfo/AtomicType.java   |   3 +-
 .../api/common/typeinfo/BasicArrayTypeInfo.java |   5 +-
 .../api/common/typeinfo/BasicTypeInfo.java      |   5 +-
 .../api/common/typeinfo/NothingTypeInfo.java    |   3 +-
 .../common/typeinfo/PrimitiveArrayTypeInfo.java |   3 +-
 .../api/common/typeinfo/TypeInformation.java    |   9 +-
 .../api/common/typeutils/CompositeType.java     |  11 +-
 .../functions/util/RuntimeUDFContextTest.java   |  11 +-
 .../base/FlatMapOperatorCollectionTest.java     |   9 +-
 .../operators/base/JoinOperatorBaseTest.java    |  15 +-
 .../common/operators/base/MapOperatorTest.java  |  16 +-
 .../base/PartitionMapOperatorTest.java          |  10 +-
 .../common/typeutils/SerializerTestBase.java    |   2 +-
 .../flink/api/java/CollectionEnvironment.java   |   2 +-
 .../java/org/apache/flink/api/java/DataSet.java |  12 +-
 .../flink/api/java/ExecutionEnvironment.java    |  43 +-
 .../flink/api/java/io/CsvOutputFormat.java      |   3 +-
 .../java/io/LocalCollectionOutputFormat.java    |   5 +-
 .../api/java/io/TypeSerializerInputFormat.java  |   4 +-
 .../api/java/io/TypeSerializerOutputFormat.java |   5 +-
 .../flink/api/java/operators/CrossOperator.java |   4 +-
 .../flink/api/java/operators/JoinOperator.java  |   4 +-
 .../api/java/operators/ProjectOperator.java     |   4 +-
 .../translation/PlanProjectOperator.java        |   5 +-
 .../flink/api/java/typeutils/EnumTypeInfo.java  |   5 +-
 .../api/java/typeutils/GenericTypeInfo.java     |  12 +-
 .../java/typeutils/InputTypeConfigurable.java   |   9 +-
 .../api/java/typeutils/MissingTypeInfo.java     |   3 +-
 .../api/java/typeutils/ObjectArrayTypeInfo.java |   7 +-
 .../flink/api/java/typeutils/PojoTypeInfo.java  |  17 +-
 .../api/java/typeutils/RecordTypeInfo.java      |   3 +-
 .../flink/api/java/typeutils/TupleTypeInfo.java |  11 +-
 .../flink/api/java/typeutils/TypeExtractor.java |   1 +
 .../flink/api/java/typeutils/ValueTypeInfo.java |   5 +-
 .../api/java/typeutils/WritableTypeInfo.java    |   5 +-
 .../java/typeutils/runtime/KryoSerializer.java  |  96 +---
 .../java/typeutils/runtime/PojoSerializer.java  | 459 +++++++++++++++----
 .../base/CoGroupOperatorCollectionTest.java     |  20 +-
 .../operators/base/GroupReduceOperatorTest.java |  17 +-
 .../operators/base/JoinOperatorBaseTest.java    |   9 +-
 .../operators/base/ReduceOperatorTest.java      |  15 +-
 .../api/java/io/CollectionInputFormatTest.java  |   5 +-
 .../api/java/io/TypeSerializerFormatTest.java   |   3 +-
 .../java/type/extractor/TypeExtractorTest.java  |  38 +-
 .../api/java/typeutils/CompositeTypeTest.java   |   3 +-
 .../api/java/typeutils/TypeInfoParserTest.java  |   8 +-
 .../runtime/CopyableValueComparatorTest.java    |   2 -
 .../runtime/KryoGenericArraySerializerTest.java |   3 +-
 .../runtime/KryoGenericTypeComparatorTest.java  |   3 +-
 .../runtime/KryoGenericTypeSerializerTest.java  |   5 +-
 .../runtime/KryoVersusAvroMinibenchmark.java    |  13 +-
 .../runtime/KryoWithCustomSerializersTest.java  |  12 +-
 .../MultidimensionalArraySerializerTest.java    |  11 +-
 .../typeutils/runtime/PojoComparatorTest.java   |   5 +-
 .../runtime/PojoGenericTypeSerializerTest.java  |   3 +-
 .../typeutils/runtime/PojoSerializerTest.java   |  13 +-
 .../runtime/PojoSubclassComparatorTest.java     |  76 +++
 .../runtime/PojoSubclassSerializerTest.java     | 196 ++++++++
 .../SubclassFromInterfaceSerializerTest.java    | 170 +++++++
 .../runtime/TupleComparatorILD2Test.java        |   2 -
 .../runtime/TupleComparatorILD3Test.java        |   2 -
 .../runtime/TupleComparatorILDC3Test.java       |   2 -
 .../runtime/TupleComparatorILDX1Test.java       |   2 -
 .../runtime/TupleComparatorILDXC2Test.java      |   2 -
 .../runtime/TupleComparatorISD1Test.java        |   2 -
 .../runtime/TupleComparatorISD2Test.java        |   2 -
 .../runtime/TupleComparatorISD3Test.java        |   2 -
 .../typeutils/runtime/TupleSerializerTest.java  |   3 +-
 .../typeutils/runtime/ValueComparatorTest.java  |   2 -
 .../runtime/WritableComparatorTest.java         |   2 -
 .../runtime/WritableSerializerTest.java         |   4 +-
 .../java/type/lambdas/LambdaExtractionTest.java |   1 -
 .../task/AbstractIterativePactTask.java         |   7 +-
 .../jobgraph/tasks/AbstractInvokable.java       |  27 ++
 .../flink/runtime/operators/DataSourceTask.java |  18 -
 .../runtime/operators/PactTaskContext.java      |   6 +-
 .../runtime/operators/RegularPactTask.java      |  19 +-
 .../operators/chaining/ChainedDriver.java       |   2 +-
 .../operators/sort/LargeRecordHandler.java      |  11 +-
 .../util/DistributedRuntimeUDFContext.java      |   9 +-
 .../drivers/AllGroupReduceDriverTest.java       |  15 +-
 .../operators/drivers/AllReduceDriverTest.java  |  27 +-
 .../drivers/GroupReduceDriverTest.java          |  39 +-
 .../drivers/ReduceCombineDriverTest.java        |  39 +-
 .../operators/drivers/ReduceDriverTest.java     |  39 +-
 .../sort/ExternalSortLargeRecordsITCase.java    |  17 +-
 .../sort/LargeRecordHandlerITCase.java          |   7 +-
 .../operators/sort/LargeRecordHandlerTest.java  |  13 +-
 .../sort/MassiveStringSortingITCase.java        |   5 +-
 .../sort/MassiveStringValueSortingITCase.java   |   5 +-
 .../scala/operators/ScalaAggregateOperator.java |  15 +-
 .../scala/operators/ScalaCsvInputFormat.java    |   7 +-
 .../scala/operators/ScalaCsvOutputFormat.java   |   3 +-
 .../apache/flink/api/scala/CrossDataSet.scala   |   5 +-
 .../flink/api/scala/ExecutionEnvironment.scala  |   8 +-
 .../api/scala/UnfinishedCoGroupOperation.scala  |   6 +-
 .../flink/api/scala/codegen/TypeAnalyzer.scala  |   8 +-
 .../api/scala/codegen/TypeInformationGen.scala  |  12 +-
 .../apache/flink/api/scala/joinDataSet.scala    |   6 +-
 .../api/scala/typeutils/CaseClassTypeInfo.scala |  11 +-
 .../api/scala/typeutils/EitherTypeInfo.scala    |  17 +-
 .../api/scala/typeutils/OptionTypeInfo.scala    |   6 +-
 .../scala/typeutils/TraversableTypeInfo.scala   |   3 +-
 .../api/scala/typeutils/TrySerializer.scala     |   5 +-
 .../flink/api/scala/typeutils/TryTypeInfo.scala |   8 +-
 .../mapred/HadoopReduceCombineFunction.java     |   4 +-
 .../mapred/HadoopReduceFunction.java            |   4 +-
 .../wrapper/HadoopTupleUnwrappingIterator.java  |   5 +-
 .../HadoopTupleUnwrappingIteratorTest.java      |   4 +-
 .../apache/flink/streaming/api/StreamGraph.java |  19 +-
 .../streaming/api/datastream/DataStream.java    |   2 +-
 .../temporaloperator/StreamJoinOperator.java    |   8 +-
 .../environment/StreamExecutionEnvironment.java |  63 ++-
 .../aggregation/AggregationFunction.java        |   4 +-
 .../aggregation/ComparableAggregator.java       |   2 +-
 .../api/function/aggregation/SumAggregator.java |   2 +-
 .../api/function/source/FileSourceFunction.java |   4 +-
 .../api/invokable/StreamInvokable.java          |   9 +-
 .../operator/GroupedWindowInvokable.java        |   2 +-
 .../invokable/operator/ProjectInvokable.java    |   2 +-
 .../api/invokable/operator/co/CoInvokable.java  |   3 +-
 .../streamrecord/StreamRecordSerializer.java    |   5 +-
 .../api/streamvertex/CoStreamVertex.java        |   2 +-
 .../api/streamvertex/StreamVertex.java          |   4 +-
 .../streamvertex/StreamingRuntimeContext.java   |   5 +-
 .../streaming/util/keys/KeySelectorUtil.java    |   6 +-
 .../streaming/api/AggregationFunctionTest.java  |   3 +-
 .../flink/streaming/util/MockCoContext.java     |   7 +-
 .../flink/streaming/util/MockContext.java       |   5 +-
 .../flink/streaming/api/scala/DataStream.scala  |   6 +-
 .../api/scala/StreamCrossOperator.scala         |   6 +-
 .../api/scala/StreamExecutionEnvironment.scala  |  30 ++
 .../api/scala/StreamJoinOperator.scala          |  25 +-
 .../api/scala/WindowedDataStream.scala          |   8 +-
 .../WordCountSubclassInterfacePOJOITCase.java   | 152 ++++++
 .../WordCountSubclassPOJOITCase.java            | 123 +++++
 .../scala/io/CollectionInputFormatTest.scala    |  10 +-
 .../misc/MassiveCaseClassSortingITCase.scala    |   9 +-
 .../scala/runtime/CaseClassComparatorTest.scala |   9 +-
 .../runtime/KryoGenericTypeSerializerTest.scala |  13 +-
 .../ScalaSpecialTypesSerializerTest.scala       |   6 +-
 .../runtime/TraversableSerializerTest.scala     |   5 +-
 .../scala/runtime/TupleComparatorILD2Test.scala |   8 +-
 .../scala/runtime/TupleComparatorILD3Test.scala |   9 +-
 .../runtime/TupleComparatorILDC3Test.scala      |   9 +-
 .../runtime/TupleComparatorILDX1Test.scala      |   5 +-
 .../runtime/TupleComparatorILDXC2Test.scala     |   5 +-
 .../scala/runtime/TupleComparatorISD1Test.scala |   5 +-
 .../scala/runtime/TupleComparatorISD2Test.scala |   5 +-
 .../scala/runtime/TupleComparatorISD3Test.scala |   9 +-
 .../api/scala/runtime/TupleSerializerTest.scala |  10 +-
 .../scala/types/TypeInformationGenTest.scala    |   1 +
 179 files changed, 2211 insertions(+), 801 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
index 208ff2e..11ac231 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.operators.DualInputOperator;
 import org.apache.flink.api.common.operators.GenericDataSourceBase;
 import org.apache.flink.api.common.operators.Operator;
@@ -70,9 +71,13 @@ public class JavaApiPostPass implements OptimizerPostPass {
        
        private final Set<PlanNode> alreadyDone = new HashSet<PlanNode>();
 
+       private ExecutionConfig executionConfig = null;
        
        @Override
        public void postPass(OptimizedPlan plan) {
+
+               executionConfig = 
plan.getOriginalPactPlan().getExecutionConfig();
+
                for (SinkPlanNode sink : plan.getDataSinks()) {
                        traverse(sink);
                }
@@ -275,22 +280,22 @@ public class JavaApiPostPass implements OptimizerPostPass 
{
                }
        }
        
-       private static <T> TypeSerializerFactory<?> 
createSerializer(TypeInformation<T> typeInfo) {
-               TypeSerializer<T> serializer = typeInfo.createSerializer();
+       private <T> TypeSerializerFactory<?> 
createSerializer(TypeInformation<T> typeInfo) {
+               TypeSerializer<T> serializer = 
typeInfo.createSerializer(executionConfig);
 
                return new RuntimeSerializerFactory<T>(serializer, 
typeInfo.getTypeClass());
        }
        
        @SuppressWarnings("unchecked")
-       private static <T> TypeComparatorFactory<?> 
createComparator(TypeInformation<T> typeInfo, FieldList keys, boolean[] 
sortOrder) {
+       private <T> TypeComparatorFactory<?> 
createComparator(TypeInformation<T> typeInfo, FieldList keys, boolean[] 
sortOrder) {
                
                TypeComparator<T> comparator;
                if (typeInfo instanceof CompositeType) {
-                       comparator = ((CompositeType<T>) 
typeInfo).createComparator(keys.toArray(), sortOrder, 0);
+                       comparator = ((CompositeType<T>) 
typeInfo).createComparator(keys.toArray(), sortOrder, 0, executionConfig);
                }
                else if (typeInfo instanceof AtomicType) {
                        // handle grouping of atomic types
-                       comparator = ((AtomicType<T>) 
typeInfo).createComparator(sortOrder[0]);
+                       comparator = ((AtomicType<T>) 
typeInfo).createComparator(sortOrder[0], executionConfig);
                }
                else {
                        throw new RuntimeException("Unrecognized type: " + 
typeInfo);

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java
index 4b48ec7..166b7b8 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpBinaryUdfOp.java
@@ -20,6 +20,7 @@ package org.apache.flink.compiler.util;
 
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.NoOpFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
@@ -43,7 +44,7 @@ public class NoOpBinaryUdfOp<OUT> extends 
DualInputOperator<OUT, OUT, OUT, NoOpF
        }
 
        @Override
-       protected List<OUT> executeOnCollections(List<OUT> inputData1, 
List<OUT> inputData2, RuntimeContext runtimeContext, boolean mutables) {
+       protected List<OUT> executeOnCollections(List<OUT> inputData1, 
List<OUT> inputData2, RuntimeContext runtimeContext, ExecutionConfig 
executionConfig) {
                throw new UnsupportedOperationException();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java
index 474d3a4..5013ae5 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpUnaryUdfOp.java
@@ -20,6 +20,7 @@ package org.apache.flink.compiler.util;
 
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.NoOpFunction;
 import org.apache.flink.api.common.operators.RecordOperator;
@@ -54,7 +55,7 @@ public class NoOpUnaryUdfOp<OUT> extends 
SingleInputOperator<OUT, OUT, NoOpFunct
        }
 
        @Override
-       protected List<OUT> executeOnCollections(List<OUT> inputData, 
RuntimeContext runtimeContext, boolean mutables) {
+       protected List<OUT> executeOnCollections(List<OUT> inputData, 
RuntimeContext runtimeContext, ExecutionConfig executionConfig) {
                return inputData;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index 810860e..182a77a 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -46,7 +46,18 @@ under the License.
                        <artifactId>commons-collections</artifactId>
                        <!-- managed version -->
                </dependency>
-               
+
+               <dependency>
+                       <groupId>com.esotericsoftware.kryo</groupId>
+                       <artifactId>kryo</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>com.twitter</groupId>
+                       <artifactId>chill_2.10</artifactId>
+                       <version>0.5.1</version>
+               </dependency>
+
                <!--  guava needs to be in "provided" scope, to make sure it is 
not included into the jars by the shading -->
                <dependency>
                        <groupId>com.google.guava</groupId>
@@ -72,4 +83,39 @@ under the License.
                </plugins>
        </build>
 
+       <!-- See main pom.xml for explanation of profiles -->
+       <profiles>
+               <profile>
+                       <id>hadoop-1</id>
+                       <activation>
+                               <property>
+                                       <!-- Please do not remove the 'hadoop1' 
comment. See ./tools/generate_specific_pom.sh -->
+                                       
<!--hadoop1--><name>hadoop.profile</name><value>1</value>
+                               </property>
+                       </activation>
+                       <dependencies>
+                               <!-- "Old" Hadoop = MapReduce v1 -->
+                               <dependency>
+                                       <groupId>org.apache.hadoop</groupId>
+                                       <artifactId>hadoop-core</artifactId>
+                               </dependency>
+                       </dependencies>
+               </profile>
+               <profile>
+                       <id>hadoop-2</id>
+                       <activation>
+                               <property>
+                                       <!-- Please do not remove the 'hadoop2' 
comment. See ./tools/generate_specific_pom.sh -->
+                                       
<!--hadoop2--><name>!hadoop.profile</name>
+                               </property>
+                       </activation>
+                       <dependencies>
+                               <dependency>
+                                       <groupId>org.apache.hadoop</groupId>
+                                       <artifactId>hadoop-common</artifactId>
+                               </dependency>
+                       </dependencies>
+               </profile>
+       </profiles>
+
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 17e683f..5fa01b7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -18,7 +18,13 @@
 
 package org.apache.flink.api.common;
 
+import com.esotericsoftware.kryo.Serializer;
+
 import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 
 /**
  * A configuration config for configuring behavior of the system, such as 
whether to use
@@ -34,11 +40,17 @@ public class ExecutionConfig implements Serializable {
        private boolean useClosureCleaner = true;
        private int degreeOfParallelism = -1;
        private int numberOfExecutionRetries = -1;
+       private boolean forceKryo = false;
 
-       // For future use...
-//     private boolean forceGenericSerializer = false;
        private boolean objectReuse = false;
 
+
+       // Serializers and types registered with Kryo and the PojoSerializer
+       private final Map<Class<?>, Serializer<?>> registeredKryoSerializers = 
new HashMap<Class<?>, Serializer<?>>();
+       private final Map<Class<?>, Class<? extends Serializer<?>>> 
registeredKryoSerializersClasses = new HashMap<Class<?>, Class<? extends 
Serializer<?>>>();
+       private final Set<Class<?>> registeredKryoTypes = new 
HashSet<Class<?>>();
+       private final Set<Class<?>> registeredPojoTypes = new 
HashSet<Class<?>>();
+
        /**
         * Enables the ClosureCleaner. This analyzes user code functions and 
sets fields to null
         * that are not used. This will in most cases make closures or 
anonymous inner classes
@@ -128,21 +140,26 @@ public class ExecutionConfig implements Serializable {
                return this;
        }
 
-       // These are for future use...
-//     public ExecutionConfig forceGenericSerializer() {
-//             forceGenericSerializer = true;
-//             return this;
-//     }
-//
-//     public ExecutionConfig disableForceGenericSerializer() {
-//             forceGenericSerializer = false;
-//             return this;
-//     }
-//
-//     public boolean isForceGenericSerializerEnabled() {
-//             return forceGenericSerializer;
-//     }
-//
+       /**
+        * Force TypeExtractor to use Kryo serializer for POJOS even though we 
could analyze as POJO.
+        * In some cases this might be preferable. For example, when using 
interfaces
+        * with subclasses that cannot be analyzed as POJO.
+        */
+       public void enableForceKryo() {
+               forceKryo = true;
+       }
+
+
+       /**
+        * Disable use of Kryo serializer for all POJOs.
+        */
+       public void disableForceKryo() {
+               forceKryo = false;
+       }
+
+       public boolean isForceKryoEnabled() {
+               return forceKryo;
+       }
 
        /**
         * Enables reusing objects that Flink internally uses for 
deserialization and passing
@@ -169,4 +186,113 @@ public class ExecutionConfig implements Serializable {
        public boolean isObjectReuseEnabled() {
                return objectReuse;
        }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Registry for types and serializers
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Registers the given Serializer as a default serializer for the given 
type at the
+        * {@link org.apache.flink.api.common.typeutils.runtime.KryoSerializer}.
+        *
+        * Note that the serializer instance must be serializable (as defined 
by java.io.Serializable),
+        * because it may be distributed to the worker nodes by java 
serialization.
+        *
+        * @param type The class of the types serialized with the given 
serializer.
+        * @param serializer The serializer to use.
+        */
+       public void registerKryoSerializer(Class<?> type, Serializer<?> 
serializer) {
+               if (type == null || serializer == null) {
+                       throw new NullPointerException("Cannot register null 
class or serializer.");
+               }
+               if (!(serializer instanceof java.io.Serializable)) {
+                       throw new IllegalArgumentException("The serializer 
instance must be serializable, (for distributing it in the cluster), "
+                                       + "as defined by java.io.Serializable. 
For stateless serializers, you can use the "
+                                       + "'registerSerializer(Class, Class)' 
method to register the serializer via its class.");
+               }
+
+               registeredKryoSerializers.put(type, serializer);
+       }
+
+       /**
+        * Registers the given Serializer via its class as a serializer for the 
given type at the
+        * {@link org.apache.flink.api.common.typeutils.runtime.KryoSerializer}.
+        *
+        * @param type The class of the types serialized with the given 
serializer.
+        * @param serializerClass The class of the serializer to use.
+        */
+       public void registerKryoSerializer(Class<?> type, Class<? extends 
Serializer<?>> serializerClass) {
+               if (type == null || serializerClass == null) {
+                       throw new NullPointerException("Cannot register null 
class or serializer.");
+               }
+
+               registeredKryoSerializersClasses.put(type, serializerClass);
+       }
+
+       /**
+        * Registers the given type with the serialization stack. If the type 
is eventually
+        * serialized as a POJO, then the type is registered with the POJO 
serializer. If the
+        * type ends up being serialized with Kryo, then it will be registered 
at Kryo to make
+        * sure that only tags are written.
+        *
+        * @param type The class of the type to register.
+        */
+       public void registerPojoType(Class<?> type) {
+               if (type == null) {
+                       throw new NullPointerException("Cannot register null 
type class.");
+               }
+               registeredPojoTypes.add(type);
+       }
+
+       /**
+        * Registers the given type with the serialization stack. If the type 
is eventually
+        * serialized as a POJO, then the type is registered with the POJO 
serializer. If the
+        * type ends up being serialized with Kryo, then it will be registered 
at Kryo to make
+        * sure that only tags are written.
+        *
+        * @param type The class of the type to register.
+        */
+       public void registerKryoType(Class<?> type) {
+               if (type == null) {
+                       throw new NullPointerException("Cannot register null 
type class.");
+               }
+               registeredKryoTypes.add(type);
+       }
+
+       /**
+        * Returns the registered Kryo Serializers.
+        */
+       public Map<Class<?>, Serializer<?>> getRegisteredKryoSerializers() {
+               return registeredKryoSerializers;
+       }
+
+       /**
+        * Returns the registered Kryo Serializer classes.
+        */
+       public Map<Class<?>, Class<? extends Serializer<?>>> 
getRegisteredKryoSerializersClasses() {
+               return registeredKryoSerializersClasses;
+       }
+
+       /**
+        * Returns the registered Kryo types.
+        */
+       public Set<Class<?>> getRegisteredKryoTypes() {
+               if (isForceKryoEnabled()) {
+                       // if we force kryo, we must also return all the types 
that
+                       // were previously only registered as POJO
+                       Set<Class<?>> result = new HashSet<Class<?>>();
+                       result.addAll(registeredKryoTypes);
+                       result.addAll(registeredPojoTypes);
+                       return result;
+               } else {
+                       return registeredKryoTypes;
+               }
+       }
+
+       /**
+        * Returns the registered POJO types.
+        */
+       public Set<Class<?>> getRegisteredPojoTypes() {
+               return registeredPojoTypes;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index ab938c0..e9209a8 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
 import java.util.HashMap;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.DoubleCounter;
 import org.apache.flink.api.common.accumulators.Histogram;
@@ -60,6 +61,12 @@ public interface RuntimeContext {
         * @return The number of the parallel subtask.
         */
        int getIndexOfThisSubtask();
+
+       /**
+        * Returns the {@link org.apache.flink.api.common.ExecutionConfig} for 
the currently executing
+        * job.
+        */
+       ExecutionConfig getExecutionConfig();
        
        /**
         * Gets the ClassLoader to load classes that were are not in system's 
classpath, but are part of the

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index 6b755e1..c04548c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.FutureTask;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.accumulators.DoubleCounter;
@@ -46,23 +47,31 @@ public abstract class AbstractRuntimeUDFContext implements 
RuntimeContext {
 
        private final ClassLoader userCodeClassLoader;
 
+       private final ExecutionConfig executionConfig;
+
        private final HashMap<String, Accumulator<?, ?>> accumulators = new 
HashMap<String, Accumulator<?, ?>>();
        
        private final DistributedCache distributedCache = new 
DistributedCache();
        
        
-       public AbstractRuntimeUDFContext(String name, int numParallelSubtasks, 
int subtaskIndex, ClassLoader userCodeClassLoader) {
+       public AbstractRuntimeUDFContext(String name, int numParallelSubtasks, 
int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig 
executionConfig) {
                this.name = name;
                this.numParallelSubtasks = numParallelSubtasks;
                this.subtaskIndex = subtaskIndex;
                this.userCodeClassLoader = userCodeClassLoader;
+               this.executionConfig = executionConfig;
        }
        
-       public AbstractRuntimeUDFContext(String name, int numParallelSubtasks, 
int subtaskIndex, ClassLoader userCodeClassLoader, Map<String, 
FutureTask<Path>> cpTasks) {
-               this(name, numParallelSubtasks, subtaskIndex, 
userCodeClassLoader);
+       public AbstractRuntimeUDFContext(String name, int numParallelSubtasks, 
int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig 
executionConfig, Map<String, FutureTask<Path>> cpTasks) {
+               this(name, numParallelSubtasks, subtaskIndex, 
userCodeClassLoader, executionConfig);
                this.distributedCache.setCopyTasks(cpTasks);
        }
-       
+
+       @Override
+       public ExecutionConfig getExecutionConfig() {
+               return executionConfig;
+       }
+
        @Override
        public String getTaskName() {
                return this.name;

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
index 74fddef..b9c98cd 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.FutureTask;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.core.fs.Path;
@@ -37,12 +38,12 @@ public class RuntimeUDFContext extends 
AbstractRuntimeUDFContext {
        private final HashMap<String, List<?>> uninitializedBroadcastVars = new 
HashMap<String, List<?>>();
        
        
-       public RuntimeUDFContext(String name, int numParallelSubtasks, int 
subtaskIndex, ClassLoader userCodeClassLoader) {
-               super(name, numParallelSubtasks, subtaskIndex, 
userCodeClassLoader);
+       public RuntimeUDFContext(String name, int numParallelSubtasks, int 
subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig) 
{
+               super(name, numParallelSubtasks, subtaskIndex, 
userCodeClassLoader, executionConfig);
        }
        
-       public RuntimeUDFContext(String name, int numParallelSubtasks, int 
subtaskIndex, ClassLoader userCodeClassLoader, Map<String, FutureTask<Path>> 
cpTasks) {
-               super(name, numParallelSubtasks, subtaskIndex, 
userCodeClassLoader, cpTasks);
+       public RuntimeUDFContext(String name, int numParallelSubtasks, int 
subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, 
Map<String, FutureTask<Path>> cpTasks) {
+               super(name, numParallelSubtasks, subtaskIndex, 
userCodeClassLoader, executionConfig, cpTasks);
        }
        
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
index ea68554..afccd7c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
@@ -69,15 +70,18 @@ public class CollectionExecutor {
        private final ClassLoader classLoader;
        
        private final boolean mutableObjectSafeMode;
+
+       private final ExecutionConfig executionConfig;
        
        // 
--------------------------------------------------------------------------------------------
        
-       public CollectionExecutor() {
-               this(DEFAULT_MUTABLE_OBJECT_SAFE_MODE);
+       public CollectionExecutor(ExecutionConfig executionConfig) {
+               this(executionConfig, DEFAULT_MUTABLE_OBJECT_SAFE_MODE);
        }
                
-       public CollectionExecutor(boolean mutableObjectSafeMode) {
+       public CollectionExecutor(ExecutionConfig executionConfig, boolean 
mutableObjectSafeMode) {
                this.mutableObjectSafeMode = mutableObjectSafeMode;
+               this.executionConfig = executionConfig;
                
                this.intermediateResults = new HashMap<Operator<?>, List<?>>();
                this.accumulators = new HashMap<String, Accumulator<?,?>>();
@@ -161,13 +165,13 @@ public class CollectionExecutor {
                @SuppressWarnings("unchecked")
                GenericDataSinkBase<IN> typedSink = (GenericDataSinkBase<IN>) 
sink;
 
-               typedSink.executeOnCollections(input);
+               typedSink.executeOnCollections(input, executionConfig);
        }
        
        private <OUT> List<OUT> executeDataSource(GenericDataSourceBase<?, ?> 
source) throws Exception {
                @SuppressWarnings("unchecked")
                GenericDataSourceBase<OUT, ?> typedSource = 
(GenericDataSourceBase<OUT, ?>) source;
-               return typedSource.executeOnCollections(mutableObjectSafeMode);
+               return typedSource.executeOnCollections(executionConfig, 
mutableObjectSafeMode);
        }
        
        private <IN, OUT> List<OUT> executeUnaryOperator(SingleInputOperator<?, 
?, ?> operator, int superStep) throws Exception {
@@ -185,8 +189,8 @@ public class CollectionExecutor {
                // build the runtime context and compute broadcast variables, 
if necessary
                RuntimeUDFContext ctx;
                if 
(RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass()))
 {
-                       ctx = superStep == 0 ? new 
RuntimeUDFContext(operator.getName(), 1, 0, getClass().getClassLoader()) :
-                                       new 
IterationRuntimeUDFContext(operator.getName(), 1, 0, superStep, classLoader);
+                       ctx = superStep == 0 ? new 
RuntimeUDFContext(operator.getName(), 1, 0, getClass().getClassLoader(), 
executionConfig) :
+                                       new 
IterationRuntimeUDFContext(operator.getName(), 1, 0, superStep, classLoader, 
executionConfig);
                        
                        for (Map.Entry<String, Operator<?>> bcInputs : 
operator.getBroadcastInputs().entrySet()) {
                                List<?> bcData = execute(bcInputs.getValue());
@@ -196,7 +200,7 @@ public class CollectionExecutor {
                        ctx = null;
                }
                
-               List<OUT> result = typedOp.executeOnCollections(inputData, ctx, 
mutableObjectSafeMode);
+               List<OUT> result = typedOp.executeOnCollections(inputData, ctx, 
executionConfig);
                
                if (ctx != null) {
                        AccumulatorHelper.mergeInto(this.accumulators, 
ctx.getAllAccumulators());
@@ -227,8 +231,8 @@ public class CollectionExecutor {
                // build the runtime context and compute broadcast variables, 
if necessary
                RuntimeUDFContext ctx;
                if 
(RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass()))
 {
-                       ctx = superStep == 0 ? new 
RuntimeUDFContext(operator.getName(), 1, 0, classLoader) :
-                               new 
IterationRuntimeUDFContext(operator.getName(), 1, 0, superStep, classLoader);
+                       ctx = superStep == 0 ? new 
RuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig) :
+                               new 
IterationRuntimeUDFContext(operator.getName(), 1, 0, superStep, classLoader, 
executionConfig);
                        
                        for (Map.Entry<String, Operator<?>> bcInputs : 
operator.getBroadcastInputs().entrySet()) {
                                List<?> bcData = execute(bcInputs.getValue());
@@ -238,7 +242,7 @@ public class CollectionExecutor {
                        ctx = null;
                }
                
-               List<OUT> result = typedOp.executeOnCollections(inputData1, 
inputData2, ctx, mutableObjectSafeMode);
+               List<OUT> result = typedOp.executeOnCollections(inputData1, 
inputData2, ctx, executionConfig);
                
                if (ctx != null) {
                        AccumulatorHelper.mergeInto(this.accumulators, 
ctx.getAllAccumulators());
@@ -349,7 +353,7 @@ public class CollectionExecutor {
 
                int[] keyColumns = iteration.getSolutionSetKeyFields();
                boolean[] inputOrderings = new boolean[keyColumns.length];
-               TypeComparator<T> inputComparator = ((CompositeType<T>) 
solutionType).createComparator(keyColumns, inputOrderings, 0);
+               TypeComparator<T> inputComparator = ((CompositeType<T>) 
solutionType).createComparator(keyColumns, inputOrderings, 0, executionConfig);
 
                Map<TypeComparable<T>, T> solutionMap = new 
HashMap<TypeComparable<T>, T>(solutionInputData.size());
                // fill the solution from the initial input
@@ -482,8 +486,8 @@ public class CollectionExecutor {
 
                private final int superstep;
 
-               public IterationRuntimeUDFContext(String name, int 
numParallelSubtasks, int subtaskIndex, int superstep, ClassLoader classloader) {
-                       super(name, numParallelSubtasks, subtaskIndex, 
classloader);
+               public IterationRuntimeUDFContext(String name, int 
numParallelSubtasks, int subtaskIndex, int superstep, ClassLoader classloader, 
ExecutionConfig executionConfig) {
+                       super(name, numParallelSubtasks, subtaskIndex, 
classloader, executionConfig);
                        this.superstep = superstep;
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java
index 9cdea6d..f43f847 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/DualInputOperator.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.operators;
 
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
@@ -286,5 +287,5 @@ public abstract class DualInputOperator<IN1, IN2, OUT, FT 
extends Function> exte
        
        // 
--------------------------------------------------------------------------------------------
        
-       protected abstract List<OUT> executeOnCollections(List<IN1> inputData1, 
List<IN2> inputData2, RuntimeContext runtimeContext, boolean 
mutableObjectSafeMode) throws Exception;
+       protected abstract List<OUT> executeOnCollections(List<IN1> inputData1, 
List<IN2> inputData2, RuntimeContext runtimeContext, ExecutionConfig 
executionConfig) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
index 242e83d..2f8a396 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.distributions.DataDistribution;
 import org.apache.flink.api.common.io.FinalizeOnMaster;
 import org.apache.flink.api.common.io.InitializeOnMaster;
@@ -298,7 +299,7 @@ public class GenericDataSinkBase<IN> extends 
Operator<Nothing> {
        
        // 
--------------------------------------------------------------------------------------------
        
-       protected void executeOnCollections(List<IN> inputData) throws 
Exception {
+       protected void executeOnCollections(List<IN> inputData, ExecutionConfig 
executionConfig) throws Exception {
                OutputFormat<IN> format = 
this.formatWrapper.getUserCodeObject();
                TypeInformation<IN> inputType = 
getInput().getOperatorInfo().getOutputType();
 
@@ -308,9 +309,9 @@ public class GenericDataSinkBase<IN> extends 
Operator<Nothing> {
 
                        final TypeComparator<IN> sortComparator;
                        if (inputType instanceof CompositeType) {
-                               sortComparator = ((CompositeType<IN>) 
inputType).createComparator(sortColumns, sortOrderings, 0);
+                               sortComparator = ((CompositeType<IN>) 
inputType).createComparator(sortColumns, sortOrderings, 0, executionConfig);
                        } else if (inputType instanceof AtomicType) {
-                               sortComparator = ((AtomicType) 
inputType).createComparator(sortOrderings[0]);
+                               sortComparator = ((AtomicType) 
inputType).createComparator(sortOrderings[0], executionConfig);
                        } else {
                                throw new UnsupportedOperationException("Local 
output sorting does not support type "+inputType+" yet.");
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
index ad1b2e4..13c5dad 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
@@ -22,6 +22,7 @@ package org.apache.flink.api.common.operators;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
@@ -176,7 +177,7 @@ public class GenericDataSourceBase<OUT, T extends 
InputFormat<OUT, ?>> extends O
        
        // 
--------------------------------------------------------------------------------------------
        
-       protected List<OUT> executeOnCollections(boolean mutableObjectSafe) 
throws Exception {
+       protected List<OUT> executeOnCollections(ExecutionConfig 
executionConfig, boolean mutableObjectSafe) throws Exception {
                @SuppressWarnings("unchecked")
                InputFormat<OUT, InputSplit> inputFormat = (InputFormat<OUT, 
InputSplit>) this.formatWrapper.getUserCodeObject();
                inputFormat.configure(this.parameters);
@@ -185,7 +186,7 @@ public class GenericDataSourceBase<OUT, T extends 
InputFormat<OUT, ?>> extends O
                
                // splits
                InputSplit[] splits = inputFormat.createInputSplits(1);
-               TypeSerializer<OUT> serializer = 
getOperatorInfo().getOutputType().createSerializer();
+               TypeSerializer<OUT> serializer = 
getOperatorInfo().getOutputType().createSerializer(executionConfig);
                
                for (InputSplit split : splits) {
                        inputFormat.open(split);

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java
index eddf89b..ada4ab0 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/SingleInputOperator.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.operators;
 
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
@@ -203,5 +204,5 @@ public abstract class SingleInputOperator<IN, OUT, FT 
extends Function> extends
        
        // 
--------------------------------------------------------------------------------------------
        
-       protected abstract List<OUT> executeOnCollections(List<IN> inputData, 
RuntimeContext runtimeContext, boolean mutableObjectSafeMode) throws Exception;
+       protected abstract List<OUT> executeOnCollections(List<IN> inputData, 
RuntimeContext runtimeContext, ExecutionConfig executionConfig) throws 
Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
index d7d0e20..9586c5d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.operators;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
@@ -47,7 +48,7 @@ public class Union<T> extends DualInputOperator<T, T, T, 
AbstractRichFunction> {
        }
 
        @Override
-       protected List<T> executeOnCollections(List<T> inputData1, List<T> 
inputData2, RuntimeContext runtimeContext, boolean mutableObjectSafeMode) {
+       protected List<T> executeOnCollections(List<T> inputData1, List<T> 
inputData2, RuntimeContext runtimeContext, ExecutionConfig executionConfig) {
                ArrayList<T> result = new ArrayList<T>(inputData1.size() + 
inputData2.size());
                result.addAll(inputData1);
                result.addAll(inputData2);

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
index 31080cd..6304197 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.InvalidProgramException;
@@ -298,7 +299,7 @@ public class BulkIterationBase<T> extends 
SingleInputOperator<T, T, AbstractRich
        }
 
        @Override
-       protected List<T> executeOnCollections(List<T> inputData, 
RuntimeContext runtimeContext, boolean mutableObjectSafeMode) {
+       protected List<T> executeOnCollections(List<T> inputData, 
RuntimeContext runtimeContext, ExecutionConfig executionConfig) {
                throw new UnsupportedOperationException();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
index 65b9d1c..4165f3d 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.operators.base;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.Partitioner;
@@ -186,7 +187,7 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends 
CoGroupFunction<IN1,
        // 
------------------------------------------------------------------------
 
        @Override
-       protected List<OUT> executeOnCollections(List<IN1> input1, List<IN2> 
input2, RuntimeContext ctx, boolean mutableObjectSafe) throws Exception {
+       protected List<OUT> executeOnCollections(List<IN1> input1, List<IN2> 
input2, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
                // 
--------------------------------------------------------------------
                // Setup
                // 
--------------------------------------------------------------------
@@ -196,17 +197,19 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT 
extends CoGroupFunction<IN1,
                // for the grouping / merging comparator
                int[] inputKeys1 = getKeyColumns(0);
                int[] inputKeys2 = getKeyColumns(1);
+
+               boolean objectReuseDisabled = 
!executionConfig.isObjectReuseEnabled();
                
                boolean[] inputDirections1 = new boolean[inputKeys1.length];
                boolean[] inputDirections2 = new boolean[inputKeys2.length];
                Arrays.fill(inputDirections1, true);
                Arrays.fill(inputDirections2, true);
                
-               final TypeSerializer<IN1> inputSerializer1 = 
inputType1.createSerializer();
-               final TypeSerializer<IN2> inputSerializer2 = 
inputType2.createSerializer();
+               final TypeSerializer<IN1> inputSerializer1 = 
inputType1.createSerializer(executionConfig);
+               final TypeSerializer<IN2> inputSerializer2 = 
inputType2.createSerializer(executionConfig);
                
-               final TypeComparator<IN1> inputComparator1 = 
getTypeComparator(inputType1, inputKeys1, inputDirections1);
-               final TypeComparator<IN2> inputComparator2 = 
getTypeComparator(inputType2, inputKeys2, inputDirections2);
+               final TypeComparator<IN1> inputComparator1 = 
getTypeComparator(executionConfig, inputType1, inputKeys1, inputDirections1);
+               final TypeComparator<IN2> inputComparator2 = 
getTypeComparator(executionConfig, inputType2, inputKeys2, inputDirections2);
                
                final TypeComparator<IN1> inputSortComparator1;
                final TypeComparator<IN2> inputSortComparator2;
@@ -227,7 +230,7 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends 
CoGroupFunction<IN1,
                        Arrays.fill(allSortDirections, 0, inputKeys1.length, 
true);
                        System.arraycopy(groupSortDirections, 0, 
allSortDirections, inputKeys1.length, groupSortDirections.length);
                        
-                       inputSortComparator1 = getTypeComparator(inputType1, 
allSortKeys, allSortDirections);
+                       inputSortComparator1 = 
getTypeComparator(executionConfig, inputType1, allSortKeys, allSortDirections);
                }
                
                if (groupOrder2 == null || groupOrder2.getNumberOfFields() == 
0) {
@@ -246,12 +249,12 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT 
extends CoGroupFunction<IN1,
                        Arrays.fill(allSortDirections, 0, inputKeys2.length, 
true);
                        System.arraycopy(groupSortDirections, 0, 
allSortDirections, inputKeys2.length, groupSortDirections.length);
                        
-                       inputSortComparator2 = getTypeComparator(inputType2, 
allSortKeys, allSortDirections);
+                       inputSortComparator2 = 
getTypeComparator(executionConfig, inputType2, allSortKeys, allSortDirections);
                }
 
                CoGroupSortListIterator<IN1, IN2> coGroupIterator =
                                new CoGroupSortListIterator<IN1, IN2>(input1, 
inputSortComparator1, inputComparator1, inputSerializer1,
-                                               input2, inputSortComparator2, 
inputComparator2, inputSerializer2, mutableObjectSafe);
+                                               input2, inputSortComparator2, 
inputComparator2, inputSerializer2, objectReuseDisabled);
 
                // 
--------------------------------------------------------------------
                // Run UDF
@@ -262,8 +265,8 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends 
CoGroupFunction<IN1,
                FunctionUtils.openFunction(function, parameters);
 
                List<OUT> result = new ArrayList<OUT>();
-               Collector<OUT> resultCollector = mutableObjectSafe ?
-                               new CopyingListCollector<OUT>(result, 
getOperatorInfo().getOutputType().createSerializer()) :
+               Collector<OUT> resultCollector = objectReuseDisabled ?
+                               new CopyingListCollector<OUT>(result, 
getOperatorInfo().getOutputType().createSerializer(executionConfig)) :
                                new ListCollector<OUT>(result);
 
                while (coGroupIterator.next()) {
@@ -275,12 +278,12 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT 
extends CoGroupFunction<IN1,
                return result;
        }
 
-       private <T> TypeComparator<T> getTypeComparator(TypeInformation<T> 
inputType, int[] inputKeys, boolean[] inputSortDirections) {
+       private <T> TypeComparator<T> getTypeComparator(ExecutionConfig 
executionConfig, TypeInformation<T> inputType, int[] inputKeys, boolean[] 
inputSortDirections) {
                if (!(inputType instanceof CompositeType)) {
                        throw new InvalidProgramException("Input types of 
coGroup must be composite types.");
                }
 
-               return ((CompositeType<T>) 
inputType).createComparator(inputKeys, inputSortDirections, 0);
+               return ((CompositeType<T>) 
inputType).createComparator(inputKeys, inputSortDirections, 0, executionConfig);
        }
 
        private static class CoGroupSortListIterator<IN1, IN2> {

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
index 8ad91c6..62bdfbe 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.operators.base;
 
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.GenericCollectorMap;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.operators.SingleInputOperator;
@@ -52,7 +53,7 @@ public class CollectorMapOperatorBase<IN, OUT, FT extends 
GenericCollectorMap<IN
        // 
--------------------------------------------------------------------------------------------
        
        @Override
-       protected List<OUT> executeOnCollections(List<IN> inputData, 
RuntimeContext ctx, boolean mutableObjectSafeMode) {
+       protected List<OUT> executeOnCollections(List<IN> inputData, 
RuntimeContext ctx, ExecutionConfig executionConfig) {
                throw new UnsupportedOperationException();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
index c6ceef0..f20659c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.operators.base;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
@@ -84,18 +85,20 @@ public class CrossOperatorBase<IN1, IN2, OUT, FT extends 
CrossFunction<IN1, IN2,
        // 
--------------------------------------------------------------------------------------------
 
        @Override
-       protected List<OUT> executeOnCollections(List<IN1> inputData1, 
List<IN2> inputData2, RuntimeContext ctx, boolean mutableObjectSafeMode) throws 
Exception {
+       protected List<OUT> executeOnCollections(List<IN1> inputData1, 
List<IN2> inputData2, RuntimeContext ctx, ExecutionConfig executionConfig) 
throws Exception {
                CrossFunction<IN1, IN2, OUT> function = 
this.userFunction.getUserCodeObject();
                
                FunctionUtils.setFunctionRuntimeContext(function, ctx);
                FunctionUtils.openFunction(function, this.parameters);
+
+               boolean objectReuseDisabled = 
!executionConfig.isObjectReuseEnabled();
                
                ArrayList<OUT> result = new ArrayList<OUT>(inputData1.size() * 
inputData2.size());
                
-               if (mutableObjectSafeMode) {
-                       TypeSerializer<IN1> inSerializer1 = 
getOperatorInfo().getFirstInputType().createSerializer();
-                       TypeSerializer<IN2> inSerializer2 = 
getOperatorInfo().getSecondInputType().createSerializer();
-                       TypeSerializer<OUT> outSerializer = 
getOperatorInfo().getOutputType().createSerializer();
+               if (objectReuseDisabled) {
+                       TypeSerializer<IN1> inSerializer1 = 
getOperatorInfo().getFirstInputType().createSerializer(executionConfig);
+                       TypeSerializer<IN2> inSerializer2 = 
getOperatorInfo().getSecondInputType().createSerializer(executionConfig);
+                       TypeSerializer<OUT> outSerializer = 
getOperatorInfo().getOutputType().createSerializer(executionConfig);
                        
                        for (IN1 element1 : inputData1) {
                                for (IN2 element2 : inputData2) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
index f945b1d..2986534 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.aggregators.AggregatorRegistry;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
@@ -332,7 +333,7 @@ public class DeltaIterationBase<ST, WT> extends 
DualInputOperator<ST, WT, ST, Ab
        }
 
        @Override
-       protected List<ST> executeOnCollections(List<ST> inputData1, List<WT> 
inputData2, RuntimeContext runtimeContext, boolean mutableObjectSafeMode) {
+       protected List<ST> executeOnCollections(List<ST> inputData1, List<WT> 
inputData2, RuntimeContext runtimeContext, ExecutionConfig executionConfig) {
                throw new UnsupportedOperationException();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java
index f4bd537..4db5265 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.operators.base;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
@@ -50,7 +51,7 @@ public class FilterOperatorBase<T, FT extends 
FlatMapFunction<T, T>> extends Sin
        }
 
        @Override
-       protected List<T> executeOnCollections(List<T> inputData, 
RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception {
+       protected List<T> executeOnCollections(List<T> inputData, 
RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
                FlatMapFunction<T, T> function = 
this.userFunction.getUserCodeObject();
                
                FunctionUtils.setFunctionRuntimeContext(function, ctx);

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java
index 8312a99..615ba87 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.operators.base;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.CopyingListCollector;
@@ -53,17 +54,19 @@ public class FlatMapOperatorBase<IN, OUT, FT extends 
FlatMapFunction<IN, OUT>> e
        // 
------------------------------------------------------------------------
 
        @Override
-       protected List<OUT> executeOnCollections(List<IN> input, RuntimeContext 
ctx, boolean mutableObjectSafeMode) throws Exception {
+       protected List<OUT> executeOnCollections(List<IN> input, RuntimeContext 
ctx, ExecutionConfig executionConfig) throws Exception {
                FlatMapFunction<IN, OUT> function = 
userFunction.getUserCodeObject();
                
                FunctionUtils.setFunctionRuntimeContext(function, ctx);
                FunctionUtils.openFunction(function, parameters);
 
                ArrayList<OUT> result = new ArrayList<OUT>(input.size());
+
+               boolean objectReuseDisabled = 
!executionConfig.isObjectReuseEnabled();
                
-               if (mutableObjectSafeMode) {
-                       TypeSerializer<IN> inSerializer = 
getOperatorInfo().getInputType().createSerializer();
-                       TypeSerializer<OUT> outSerializer = 
getOperatorInfo().getOutputType().createSerializer();
+               if (objectReuseDisabled) {
+                       TypeSerializer<IN> inSerializer = 
getOperatorInfo().getInputType().createSerializer(executionConfig);
+                       TypeSerializer<OUT> outSerializer = 
getOperatorInfo().getOutputType().createSerializer(executionConfig);
                        
                        CopyingListCollector<OUT> resultCollector = new 
CopyingListCollector<OUT>(result, outSerializer);
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
index ddfd874..f4f7d0f 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.common.operators.base;
 
 import org.apache.commons.lang3.ArrayUtils;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.FlatCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
@@ -152,9 +153,11 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends 
GroupReduceFunction<IN,
        // 
--------------------------------------------------------------------------------------------
 
        @Override
-       protected List<OUT> executeOnCollections(List<IN> inputData, 
RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception {
+       protected List<OUT> executeOnCollections(List<IN> inputData, 
RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
                GroupReduceFunction<IN, OUT> function = 
this.userFunction.getUserCodeObject();
 
+               boolean objectReuseDisabled = 
!executionConfig.isObjectReuseEnabled();
+
                UnaryOperatorInformation<IN, OUT> operatorInfo = 
getOperatorInfo();
                TypeInformation<IN> inputType = operatorInfo.getInputType();
 
@@ -176,7 +179,7 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends 
GroupReduceFunction<IN,
                        if(sortColumns.length == 0) { // => all reduce. No 
comparator
                                
Preconditions.checkArgument(sortOrderings.length == 0);
                        } else {
-                               final TypeComparator<IN> sortComparator = 
((CompositeType<IN>) inputType).createComparator(sortColumns, sortOrderings, 0);
+                               final TypeComparator<IN> sortComparator = 
((CompositeType<IN>) inputType).createComparator(sortColumns, sortOrderings, 0, 
executionConfig);
        
                                Collections.sort(inputData, new 
Comparator<IN>() {
                                        @Override
@@ -193,9 +196,9 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends 
GroupReduceFunction<IN,
                ArrayList<OUT> result = new ArrayList<OUT>();
 
                if (keyColumns.length == 0) {
-                       if (mutableObjectSafeMode) {
-                               final TypeSerializer<IN> inputSerializer = 
inputType.createSerializer();
-                               TypeSerializer<OUT> outSerializer = 
getOperatorInfo().getOutputType().createSerializer();
+                       if (objectReuseDisabled) {
+                               final TypeSerializer<IN> inputSerializer = 
inputType.createSerializer(executionConfig);
+                               TypeSerializer<OUT> outSerializer = 
getOperatorInfo().getOutputType().createSerializer(executionConfig);
                                List<IN> inputDataCopy = new 
ArrayList<IN>(inputData.size());
                                for (IN in: inputData) {
                                        
inputDataCopy.add(inputSerializer.copy(in));
@@ -208,14 +211,14 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends 
GroupReduceFunction<IN,
                                function.reduce(inputData, collector);
                        }
                } else {
-                       final TypeSerializer<IN> inputSerializer = 
inputType.createSerializer();
+                       final TypeSerializer<IN> inputSerializer = 
inputType.createSerializer(executionConfig);
                        boolean[] keyOrderings = new boolean[keyColumns.length];
-                       final TypeComparator<IN> comparator = 
((CompositeType<IN>) inputType).createComparator(keyColumns, keyOrderings, 0);
+                       final TypeComparator<IN> comparator = 
((CompositeType<IN>) inputType).createComparator(keyColumns, keyOrderings, 0, 
executionConfig);
 
-                       ListKeyGroupedIterator<IN> keyedIterator = new 
ListKeyGroupedIterator<IN>(inputData, inputSerializer, comparator, 
mutableObjectSafeMode);
+                       ListKeyGroupedIterator<IN> keyedIterator = new 
ListKeyGroupedIterator<IN>(inputData, inputSerializer, comparator, 
objectReuseDisabled);
 
-                       if (mutableObjectSafeMode) {
-                               TypeSerializer<OUT> outSerializer = 
getOperatorInfo().getOutputType().createSerializer();
+                       if (objectReuseDisabled) {
+                               TypeSerializer<OUT> outSerializer = 
getOperatorInfo().getOutputType().createSerializer(executionConfig);
                                CopyingListCollector<OUT> collector = new 
CopyingListCollector<OUT>(result, outSerializer);
 
                                while (keyedIterator.nextKey()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
index 555175d..373846f 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.operators.base;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.RuntimeContext;
@@ -139,7 +140,7 @@ public class JoinOperatorBase<IN1, IN2, OUT, FT extends 
FlatJoinFunction<IN1, IN
 
        @SuppressWarnings("unchecked")
        @Override
-       protected List<OUT> executeOnCollections(List<IN1> inputData1, 
List<IN2> inputData2, RuntimeContext runtimeContext, boolean mutableObjectSafe) 
throws Exception {
+       protected List<OUT> executeOnCollections(List<IN1> inputData1, 
List<IN2> inputData2, RuntimeContext runtimeContext, ExecutionConfig 
executionConfig) throws Exception {
                FlatJoinFunction<IN1, IN2, OUT> function = 
userFunction.getUserCodeObject();
 
                FunctionUtils.setFunctionRuntimeContext(function, 
runtimeContext);
@@ -148,22 +149,24 @@ public class JoinOperatorBase<IN1, IN2, OUT, FT extends 
FlatJoinFunction<IN1, IN
                TypeInformation<IN1> leftInformation = 
getOperatorInfo().getFirstInputType();
                TypeInformation<IN2> rightInformation = 
getOperatorInfo().getSecondInputType();
                TypeInformation<OUT> outInformation = 
getOperatorInfo().getOutputType();
+
+               boolean objectReuseDisabled = 
!executionConfig.isObjectReuseEnabled();
                
-               TypeSerializer<IN1> leftSerializer = mutableObjectSafe ? 
leftInformation.createSerializer() : null;
-               TypeSerializer<IN2> rightSerializer = mutableObjectSafe ? 
rightInformation.createSerializer() : null;
+               TypeSerializer<IN1> leftSerializer = objectReuseDisabled ? 
leftInformation.createSerializer(executionConfig) : null;
+               TypeSerializer<IN2> rightSerializer = objectReuseDisabled ? 
rightInformation.createSerializer(executionConfig) : null;
                
                TypeComparator<IN1> leftComparator;
                TypeComparator<IN2> rightComparator;
 
                if (leftInformation instanceof AtomicType) {
-                       leftComparator = ((AtomicType<IN1>) 
leftInformation).createComparator(true);
+                       leftComparator = ((AtomicType<IN1>) 
leftInformation).createComparator(true, executionConfig);
                }
                else if (leftInformation instanceof CompositeType) {
                        int[] keyPositions = getKeyColumns(0);
                        boolean[] orders = new boolean[keyPositions.length];
                        Arrays.fill(orders, true);
 
-                       leftComparator = ((CompositeType<IN1>) 
leftInformation).createComparator(keyPositions, orders, 0);
+                       leftComparator = ((CompositeType<IN1>) 
leftInformation).createComparator(keyPositions, orders, 0, executionConfig);
                }
                else {
                        throw new RuntimeException("Type information for left 
input of type " + leftInformation.getClass()
@@ -171,14 +174,14 @@ public class JoinOperatorBase<IN1, IN2, OUT, FT extends 
FlatJoinFunction<IN1, IN
                }
 
                if (rightInformation instanceof AtomicType) {
-                       rightComparator = ((AtomicType<IN2>) 
rightInformation).createComparator(true);
+                       rightComparator = ((AtomicType<IN2>) 
rightInformation).createComparator(true, executionConfig);
                }
                else if (rightInformation instanceof CompositeType) {
                        int[] keyPositions = getKeyColumns(1);
                        boolean[] orders = new boolean[keyPositions.length];
                        Arrays.fill(orders, true);
 
-                       rightComparator = ((CompositeType<IN2>) 
rightInformation).createComparator(keyPositions, orders, 0);
+                       rightComparator = ((CompositeType<IN2>) 
rightInformation).createComparator(keyPositions, orders, 0, executionConfig);
                }
                else {
                        throw new RuntimeException("Type information for right 
input of type " + rightInformation.getClass()
@@ -188,7 +191,7 @@ public class JoinOperatorBase<IN1, IN2, OUT, FT extends 
FlatJoinFunction<IN1, IN
                TypePairComparator<IN1, IN2> pairComparator = new 
GenericPairComparator<IN1, IN2>(leftComparator, rightComparator);
 
                List<OUT> result = new ArrayList<OUT>();
-               Collector<OUT> collector = mutableObjectSafe ? new 
CopyingListCollector<OUT>(result, outInformation.createSerializer())
+               Collector<OUT> collector = objectReuseDisabled ? new 
CopyingListCollector<OUT>(result, 
outInformation.createSerializer(executionConfig))
                                                                                
                                : new ListCollector<OUT>(result);
 
                Map<Integer, List<IN2>> probeTable = new HashMap<Integer, 
List<IN2>>();
@@ -212,7 +215,7 @@ public class JoinOperatorBase<IN1, IN2, OUT, FT extends 
FlatJoinFunction<IN1, IN
                                pairComparator.setReference(left);
                                for (IN2 right : matchingHashes) {
                                        if 
(pairComparator.equalToReference(right)) {
-                                               if (mutableObjectSafe) {
+                                               if (objectReuseDisabled) {
                                                        
function.join(leftSerializer.copy(left), rightSerializer.copy(right), 
collector);
                                                } else {
                                                        function.join(left, 
right, collector);

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java
index 0218bfa..cde3b74 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.operators.base;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
@@ -54,17 +55,19 @@ public class MapOperatorBase<IN, OUT, FT extends 
MapFunction<IN, OUT>> extends S
        // 
--------------------------------------------------------------------------------------------
        
        @Override
-       protected List<OUT> executeOnCollections(List<IN> inputData, 
RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception {
+       protected List<OUT> executeOnCollections(List<IN> inputData, 
RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
                MapFunction<IN, OUT> function = 
this.userFunction.getUserCodeObject();
                
                FunctionUtils.setFunctionRuntimeContext(function, ctx);
                FunctionUtils.openFunction(function, this.parameters);
                
                ArrayList<OUT> result = new ArrayList<OUT>(inputData.size());
+
+               boolean objectReuseDisabled = 
!executionConfig.isObjectReuseEnabled();
                
-               if (mutableObjectSafeMode) {
-                       TypeSerializer<IN> inSerializer = 
getOperatorInfo().getInputType().createSerializer();
-                       TypeSerializer<OUT> outSerializer = 
getOperatorInfo().getOutputType().createSerializer();
+               if (objectReuseDisabled) {
+                       TypeSerializer<IN> inSerializer = 
getOperatorInfo().getInputType().createSerializer(executionConfig);
+                       TypeSerializer<OUT> outSerializer = 
getOperatorInfo().getOutputType().createSerializer(executionConfig);
                        
                        for (IN element : inputData) {
                                IN inCopy = inSerializer.copy(element);

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java
index 7c1fcef..25b3bb8 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.operators.base;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.CopyingIterator;
@@ -57,17 +58,19 @@ public class MapPartitionOperatorBase<IN, OUT, FT extends 
MapPartitionFunction<I
        // 
--------------------------------------------------------------------------------------------
        
        @Override
-       protected List<OUT> executeOnCollections(List<IN> inputData, 
RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception {
+       protected List<OUT> executeOnCollections(List<IN> inputData, 
RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
                MapPartitionFunction<IN, OUT> function = 
this.userFunction.getUserCodeObject();
                
                FunctionUtils.setFunctionRuntimeContext(function, ctx);
                FunctionUtils.openFunction(function, this.parameters);
                
                ArrayList<OUT> result = new ArrayList<OUT>(inputData.size() / 
4);
+
+               boolean objectReuseDisabled = 
!executionConfig.isObjectReuseEnabled();
                
-               if (mutableObjectSafeMode) {
-                       TypeSerializer<IN> inSerializer = 
getOperatorInfo().getInputType().createSerializer();
-                       TypeSerializer<OUT> outSerializer = 
getOperatorInfo().getOutputType().createSerializer();
+               if (objectReuseDisabled) {
+                       TypeSerializer<IN> inSerializer = 
getOperatorInfo().getInputType().createSerializer(executionConfig);
+                       TypeSerializer<OUT> outSerializer = 
getOperatorInfo().getOutputType().createSerializer(executionConfig);
                        
                        CopyingIterator<IN> source = new 
CopyingIterator<IN>(inputData.iterator(), inSerializer);
                        CopyingListCollector<OUT> resultCollector = new 
CopyingListCollector<OUT>(result, outSerializer);

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java
index 3602a82..f91d7d8 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/PartitionOperatorBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.operators.base;
 
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.NoOpFunction;
@@ -88,7 +89,7 @@ public class PartitionOperatorBase<IN> extends 
SingleInputOperator<IN, IN, NoOpF
        // 
--------------------------------------------------------------------------------------------
        
        @Override
-       protected List<IN> executeOnCollections(List<IN> inputData, 
RuntimeContext runtimeContext, boolean mutableObjectSafeMode) {
+       protected List<IN> executeOnCollections(List<IN> inputData, 
RuntimeContext runtimeContext, ExecutionConfig executionConfig) {
                return inputData;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
index f1bf0e9..d3d61e9 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.operators.base;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.ReduceFunction;
@@ -149,12 +150,13 @@ public class ReduceOperatorBase<T, FT extends 
ReduceFunction<T>> extends SingleI
        // 
--------------------------------------------------------------------------------------------
        
        @Override
-       protected List<T> executeOnCollections(List<T> inputData, 
RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception {
+       protected List<T> executeOnCollections(List<T> inputData, 
RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
                // make sure we can handle empty inputs
                if (inputData.isEmpty()) {
                        return Collections.emptyList();
                }
-               
+
+               boolean objectReuseDisabled = 
!executionConfig.isObjectReuseEnabled();
                ReduceFunction<T> function = 
this.userFunction.getUserCodeObject();
 
                UnaryOperatorInformation<T, T> operatorInfo = getOperatorInfo();
@@ -169,11 +171,11 @@ public class ReduceOperatorBase<T, FT extends 
ReduceFunction<T>> extends SingleI
                FunctionUtils.setFunctionRuntimeContext(function, ctx);
                FunctionUtils.openFunction(function, this.parameters);
 
-               TypeSerializer<T> serializer = 
getOperatorInfo().getInputType().createSerializer();
+               TypeSerializer<T> serializer = 
getOperatorInfo().getInputType().createSerializer(executionConfig);
 
                if (inputColumns.length > 0) {
                        boolean[] inputOrderings = new 
boolean[inputColumns.length];
-                       TypeComparator<T> inputComparator = ((CompositeType<T>) 
inputType).createComparator(inputColumns, inputOrderings, 0);
+                       TypeComparator<T> inputComparator = ((CompositeType<T>) 
inputType).createComparator(inputColumns, inputOrderings, 0, executionConfig);
 
                        Map<TypeComparable<T>, T> aggregateMap = new 
HashMap<TypeComparable<T>, T>(inputData.size() / 10);
 
@@ -183,7 +185,7 @@ public class ReduceOperatorBase<T, FT extends 
ReduceFunction<T>> extends SingleI
                                T existing = aggregateMap.get(wrapper);
                                T result;
 
-                               if (mutableObjectSafeMode) {
+                               if (objectReuseDisabled) {
                                        if (existing != null) {
                                                result = 
function.reduce(existing, serializer.copy(next));
                                        } else {
@@ -209,7 +211,7 @@ public class ReduceOperatorBase<T, FT extends 
ReduceFunction<T>> extends SingleI
                else {
                        T aggregate = inputData.get(0);
                        
-                       if (mutableObjectSafeMode) {
+                       if (objectReuseDisabled) {
                                aggregate = serializer.copy(aggregate);
                                
                                for (int i = 1; i < inputData.size(); i++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/AtomicType.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/AtomicType.java 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/AtomicType.java
index 10dbbfe..2fc8a1b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/AtomicType.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/AtomicType.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.typeinfo;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 
 
@@ -26,5 +27,5 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
  */
 public interface AtomicType<T> {
        
-       TypeComparator<T> createComparator(boolean sortOrderAscending);
+       TypeComparator<T> createComparator(boolean sortOrderAscending, 
ExecutionConfig executionConfig);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
index 646a549..80f5f63 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.typeinfo;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.array.StringArraySerializer;
 import org.apache.flink.api.common.functions.InvalidTypesException;
@@ -94,12 +95,12 @@ public class BasicArrayTypeInfo<T, C> extends 
TypeInformation<T> {
 
        @Override
        @SuppressWarnings("unchecked")
-       public TypeSerializer<T> createSerializer() {
+       public TypeSerializer<T> createSerializer(ExecutionConfig 
executionConfig) {
                // special case the string array
                if (componentClass.equals(String.class)) {
                        return (TypeSerializer<T>) 
StringArraySerializer.INSTANCE;
                } else {
-                       return (TypeSerializer<T>) new 
GenericArraySerializer<C>(this.componentClass, 
this.componentInfo.createSerializer());
+                       return (TypeSerializer<T>) new 
GenericArraySerializer<C>(this.componentClass, 
this.componentInfo.createSerializer(executionConfig));
                }
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
index 61d830a..7bf7298 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
@@ -23,6 +23,7 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -114,12 +115,12 @@ public class BasicTypeInfo<T> extends TypeInformation<T> 
implements AtomicType<T
        }
        
        @Override
-       public TypeSerializer<T> createSerializer() {
+       public TypeSerializer<T> createSerializer(ExecutionConfig 
executionConfig) {
                return this.serializer;
        }
        
        @Override
-       public TypeComparator<T> createComparator(boolean sortOrderAscending) {
+       public TypeComparator<T> createComparator(boolean sortOrderAscending, 
ExecutionConfig executionConfig) {
                if (comparatorClass != null) {
                        return instantiateComparator(comparatorClass, 
sortOrderAscending);
                } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NothingTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NothingTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NothingTypeInfo.java
index dba0e6f..367670c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NothingTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NothingTypeInfo.java
@@ -19,6 +19,7 @@
 
 package org.apache.flink.api.common.typeinfo;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.types.Nothing;
 
@@ -54,7 +55,7 @@ public class NothingTypeInfo extends TypeInformation<Nothing> 
{
        }
 
        @Override
-       public TypeSerializer<Nothing> createSerializer() {
+       public TypeSerializer<Nothing> createSerializer(ExecutionConfig 
executionConfig) {
                throw new RuntimeException("The Nothing type cannot have a 
serializer.");
        }
 }

Reply via email to