Repository: flink
Updated Branches:
  refs/heads/master 6b402f43d -> 7407076d3


http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java
index d2b2032..6c53b6b 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java
@@ -71,7 +71,7 @@ public class StreamJoinOperator<I1, I2> extends
                 */
                public JoinPredicate<I1, I2> where(int... fields) {
                        return new JoinPredicate<I1, I2>(op, 
KeySelectorUtil.getSelectorForKeys(
-                                       new Keys.ExpressionKeys<I1>(fields, 
type1), type1));
+                                       new Keys.ExpressionKeys<I1>(fields, 
type1), type1, op.input1.getExecutionEnvironment().getConfig()));
                }
 
                /**
@@ -88,7 +88,7 @@ public class StreamJoinOperator<I1, I2> extends
                 */
                public JoinPredicate<I1, I2> where(String... fields) {
                        return new JoinPredicate<I1, I2>(op, 
KeySelectorUtil.getSelectorForKeys(
-                                       new Keys.ExpressionKeys<I1>(fields, 
type1), type1));
+                                       new Keys.ExpressionKeys<I1>(fields, 
type1), type1, op.input1.getExecutionEnvironment().getConfig()));
                }
 
                /**
@@ -158,7 +158,7 @@ public class StreamJoinOperator<I1, I2> extends
                 */
                public JoinedStream<I1, I2> equalTo(int... fields) {
                        keys2 = KeySelectorUtil.getSelectorForKeys(new 
Keys.ExpressionKeys<I2>(fields, type2),
-                                       type2);
+                                       type2, 
op.input1.getExecutionEnvironment().getConfig());
                        return createJoinOperator();
                }
 
@@ -177,7 +177,7 @@ public class StreamJoinOperator<I1, I2> extends
                 */
                public JoinedStream<I1, I2> equalTo(String... fields) {
                        this.keys2 = KeySelectorUtil.getSelectorForKeys(new 
Keys.ExpressionKeys<I2>(fields,
-                                       type2), type2);
+                                       type2), type2, 
op.input1.getExecutionEnvironment().getConfig());
                        return createJoinOperator();
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 45c14c1..65dde79 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
 import java.util.Collection;
 import java.util.List;
 
+import com.esotericsoftware.kryo.Serializer;
 import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.InputFormat;
@@ -30,6 +31,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.TextInputFormat;
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.Client.OptimizerPlanEnvironment;
@@ -78,15 +80,7 @@ public abstract class StreamExecutionEnvironment {
         * Constructor for creating StreamExecutionEnvironment
         */
        protected StreamExecutionEnvironment() {
-               streamGraph = new StreamGraph();
-       }
-
-       /**
-        * Sets the config object.
-        */
-       public void setConfig(ExecutionConfig config) {
-               Validate.notNull(config);
-               this.config = config;
+               streamGraph = new StreamGraph(config);
        }
 
        /**
@@ -181,6 +175,57 @@ public abstract class StreamExecutionEnvironment {
        }
 
        // 
--------------------------------------------------------------------------------------------
+       //  Registry for types and serializers
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Registers the given Serializer as a default serializer for the given 
type at the
+        * {@link org.apache.flink.api.java.typeutils.runtime.KryoSerializer}.
+        *
+        * Note that the serializer instance must be serializable (as defined 
by java.io.Serializable),
+        * because it may be distributed to the worker nodes by java 
serialization.
+        *
+        * @param type The class of the types serialized with the given 
serializer.
+        * @param serializer The serializer to use.
+        */
+       public void registerKryoSerializer(Class<?> type, Serializer<?> 
serializer) {
+               config.registerKryoSerializer(type, serializer);
+       }
+
+       /**
+        * Registers the given Serializer via its class as a serializer for the 
given type at the
+        * {@link org.apache.flink.api.java.typeutils.runtime.KryoSerializer}.
+        *
+        * @param type The class of the types serialized with the given 
serializer.
+        * @param serializerClass The class of the serializer to use.
+        */
+       public void registerKryoSerializer(Class<?> type, Class<? extends 
Serializer<?>> serializerClass) {
+               config.registerKryoSerializer(type, serializerClass);
+       }
+
+       /**
+        * Registers the given type with the serialization stack. If the type 
is eventually
+        * serialized as a POJO, then the type is registered with the POJO 
serializer. If the
+        * type ends up being serialized with Kryo, then it will be registered 
at Kryo to make
+        * sure that only tags are written.
+        *
+        * @param type The class of the type to register.
+        */
+       public void registerType(Class<?> type) {
+               if (type == null) {
+                       throw new NullPointerException("Cannot register null 
type class.");
+               }
+
+               TypeInformation<?> typeInfo = 
TypeExtractor.createTypeInfo(type);
+
+               if (typeInfo instanceof PojoTypeInfo) {
+                       config.registerPojoType(type);
+               } else {
+                       config.registerKryoType(type);
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
        // Data stream creations
        // 
--------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java
index 1c273d3..3704e3b 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java
@@ -17,9 +17,9 @@
 
 package org.apache.flink.streaming.api.function.aggregation;
 
-import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
 
-public abstract class AggregationFunction<T> implements ReduceFunction<T> {
+public abstract class AggregationFunction<T> extends RichReduceFunction<T> {
        private static final long serialVersionUID = 1L;
 
        public int position;

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java
index 226c45a..7f7cf0b 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java
@@ -193,7 +193,7 @@ public abstract class ComparableAggregator<T> extends 
AggregationFunction<T> {
 
                        if (cType instanceof PojoTypeInfo) {
                                pojoComparator = (PojoComparator<T>) 
cType.createComparator(
-                                               new int[] { logicalKeyPosition 
}, new boolean[] { false }, 0);
+                                               new int[] { logicalKeyPosition 
}, new boolean[] { false }, 0, getRuntimeContext().getExecutionConfig());
                        } else {
                                throw new IllegalArgumentException(
                                                "Key expressions are only 
supported on POJO types. "

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java
index 142028b..74e4597 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java
@@ -146,7 +146,7 @@ public abstract class SumAggregator {
 
                        if (cType instanceof PojoTypeInfo) {
                                comparator = (PojoComparator<T>) 
cType.createComparator(
-                                               new int[] { logicalKeyPosition 
}, new boolean[] { false }, 0);
+                                               new int[] { logicalKeyPosition 
}, new boolean[] { false }, 0, getRuntimeContext().getExecutionConfig());
                        } else {
                                throw new IllegalArgumentException(
                                                "Key expressions are only 
supported on POJO types. "

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
index a5ef3a7..6d1441a 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
@@ -45,8 +45,8 @@ public class FileSourceFunction extends 
RichSourceFunction<String> {
                this.serializerFactory = createSerializer(typeInfo);
        }
 
-       private static TypeSerializerFactory<String> 
createSerializer(TypeInformation<String> typeInfo) {
-               TypeSerializer<String> serializer = typeInfo.createSerializer();
+       private TypeSerializerFactory<String> 
createSerializer(TypeInformation<String> typeInfo) {
+               TypeSerializer<String> serializer = 
typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
 
                return new RuntimeSerializerFactory<String>(serializer, 
typeInfo.getTypeClass());
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
index 6cee5f2..793c952 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.invokable;
 import java.io.IOException;
 import java.io.Serializable;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
@@ -48,6 +49,8 @@ public abstract class StreamInvokable<IN, OUT> implements 
Serializable {
 
        protected StreamTaskContext<OUT> taskContext;
 
+       protected ExecutionConfig executionConfig = null;
+
        protected MutableObjectIterator<StreamRecord<IN>> recordIterator;
        protected StreamRecordSerializer<IN> inSerializer;
        protected TypeSerializer<IN> objectSerializer;
@@ -67,11 +70,12 @@ public abstract class StreamInvokable<IN, OUT> implements 
Serializable {
 
        /**
         * Initializes the {@link StreamInvokable} for input and output handling
-        * 
+        *
         * @param taskContext
         *            StreamTaskContext representing the vertex
+        * @param executionConfig
         */
-       public void setup(StreamTaskContext<OUT> taskContext) {
+       public void setup(StreamTaskContext<OUT> taskContext, ExecutionConfig 
executionConfig) {
                this.collector = taskContext.getOutputCollector();
                this.recordIterator = taskContext.getInput(0);
                this.inSerializer = taskContext.getInputSerializer(0);
@@ -80,6 +84,7 @@ public abstract class StreamInvokable<IN, OUT> implements 
Serializable {
                        this.objectSerializer = 
inSerializer.getObjectSerializer();
                }
                this.taskContext = taskContext;
+               this.executionConfig = executionConfig;
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
index 997463c..df2edd2 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
@@ -354,7 +354,7 @@ public class GroupedWindowInvokable<IN, OUT> extends 
StreamInvokable<IN, OUT> {
                                        clonedDistributedEvictionPolicies);
                }
 
-               groupInvokable.setup(taskContext);
+               groupInvokable.setup(taskContext, executionConfig);
                groupInvokable.open(this.parameters);
                windowingGroups.put(keySelector.getKey(element.getObject()), 
groupInvokable);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
index c9d9e5a..69c7cee 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
@@ -35,7 +35,7 @@ public class ProjectInvokable<IN, OUT extends Tuple> extends 
StreamInvokable<IN,
                super(null);
                this.fields = fields;
                this.numFields = this.fields.length;
-               this.outTypeSerializer = outTypeInformation.createSerializer();
+               this.outTypeSerializer = 
outTypeInformation.createSerializer(executionConfig);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index 604873e..9f98db3 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.api.invokable.operator.co;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
@@ -46,7 +47,7 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends 
StreamInvokable<IN1, OU
        protected TypeSerializer<IN2> serializer2;
 
        @Override
-       public void setup(StreamTaskContext<OUT> taskContext) {
+       public void setup(StreamTaskContext<OUT> taskContext, ExecutionConfig 
executionConfig) {
                this.collector = taskContext.getOutputCollector();
 
                this.recordIterator = taskContext.getCoReader();

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
index 98f12ec..cd68937 100755
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.streamrecord;
 
 import java.io.IOException;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
@@ -32,8 +33,8 @@ public final class StreamRecordSerializer<T> extends 
TypeSerializer<StreamRecord
        private final TypeSerializer<T> typeSerializer;
        private final boolean isTuple;
 
-       public StreamRecordSerializer(TypeInformation<T> typeInfo) {
-               this.typeSerializer = typeInfo.createSerializer();
+       public StreamRecordSerializer(TypeInformation<T> typeInfo, 
ExecutionConfig executionConfig) {
+               this.typeSerializer = 
typeInfo.createSerializer(executionConfig);
                this.isTuple = typeInfo.isTupleType();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
index 2b650be..83cdcd1 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
@@ -68,7 +68,7 @@ public class CoStreamVertex<IN1, IN2, OUT> extends 
StreamVertex<IN1, OUT> {
        @Override
        protected void setInvokable() {
                userInvokable = configuration.getUserInvokable(userClassLoader);
-               userInvokable.setup(this);
+               userInvokable.setup(this, getExecutionConfig());
        }
 
        protected void setConfigInputs() throws StreamVertexException {

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index 994b1fa..024e415 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -97,7 +97,7 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable 
implements StreamTa
 
        protected void setInvokable() {
                userInvokable = configuration.getUserInvokable(userClassLoader);
-               userInvokable.setup(this);
+               userInvokable.setup(this, getExecutionConfig());
        }
 
        public String getName() {
@@ -111,7 +111,7 @@ public class StreamVertex<IN, OUT> extends 
AbstractInvokable implements StreamTa
        public StreamingRuntimeContext createRuntimeContext(String taskName,
                        Map<String, OperatorState<?>> states) {
                Environment env = getEnvironment();
-               return new StreamingRuntimeContext(taskName, env, 
getUserCodeClassLoader(), states);
+               return new StreamingRuntimeContext(taskName, env, 
getUserCodeClassLoader(), getExecutionConfig(), states);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
index a1a64e2..0daf3c2 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.streamvertex;
 
 import java.util.Map;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
 import org.apache.flink.configuration.Configuration;
@@ -38,9 +39,9 @@ public class StreamingRuntimeContext extends 
RuntimeUDFContext {
        private final Map<String, OperatorState<?>> operatorStates;
 
        public StreamingRuntimeContext(String name, Environment env, 
ClassLoader userCodeClassLoader,
-                       Map<String, OperatorState<?>> operatorStates) {
+                       ExecutionConfig executionConfig, Map<String, 
OperatorState<?>> operatorStates) {
                super(name, env.getNumberOfSubtasks(), 
env.getIndexInSubtaskGroup(),
-                               userCodeClassLoader, env.getCopyTask());
+                               userCodeClassLoader, executionConfig, 
env.getCopyTask());
                this.env = env;
                this.operatorStates = operatorStates;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
index 08afd0d..77467b5 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.util.keys;
 
 import java.lang.reflect.Array;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -60,12 +61,13 @@ public class KeySelectorUtil {
                        Tuple20.class, Tuple21.class, Tuple22.class, 
Tuple23.class, Tuple24.class,
                        Tuple25.class };
 
-       public static <X> KeySelector<X, ?> getSelectorForKeys(Keys<X> keys, 
TypeInformation<X> typeInfo) {
+       public static <X> KeySelector<X, ?> getSelectorForKeys(Keys<X> keys, 
TypeInformation<X> typeInfo, ExecutionConfig executionConfig) {
                int[] logicalKeyPositions = keys.computeLogicalKeyPositions();
                int keyLength = logicalKeyPositions.length;
                boolean[] orders = new boolean[keyLength];
+               // TODO: Fix using KeySelector everywhere
                TypeComparator<X> comparator = ((CompositeType<X>) 
typeInfo).createComparator(
-                               logicalKeyPositions, orders, 0);
+                               logicalKeyPositions, orders, 0, 
executionConfig);
                return new ComparableKeySelector<X>(comparator, keyLength);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
index 49cd497..115f614 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.fail;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -118,7 +119,7 @@ public class AggregationFunctionTest {
 
                KeySelector<Tuple2<Integer, Integer>, ?> keySelector = 
KeySelectorUtil.getSelectorForKeys(
                                new Keys.ExpressionKeys<Tuple2<Integer, 
Integer>>(new int[] { 0 }, typeInfo),
-                               typeInfo);
+                               typeInfo, new ExecutionConfig());
 
                List<Tuple2<Integer, Integer>> groupedSumList = 
MockContext.createAndExecute(
                                new GroupedReduceInvokable<Tuple2<Integer, 
Integer>>(sumFunction, keySelector),

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
index ea94f98..a6560ae 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -57,9 +58,9 @@ public class MockCoContext<IN1, IN2, OUT> implements 
StreamTaskContext<OUT> {
                this.inputIterator2 = input2.iterator();
 
                TypeInformation<IN1> inTypeInfo1 = 
TypeExtractor.getForObject(input1.iterator().next());
-               inDeserializer1 = new StreamRecordSerializer<IN1>(inTypeInfo1);
+               inDeserializer1 = new StreamRecordSerializer<IN1>(inTypeInfo1, 
new ExecutionConfig());
                TypeInformation<IN2> inTypeInfo2 = 
TypeExtractor.getForObject(input2.iterator().next());
-               inDeserializer2 = new StreamRecordSerializer<IN2>(inTypeInfo2);
+               inDeserializer2 = new StreamRecordSerializer<IN2>(inTypeInfo2, 
new ExecutionConfig());
 
                mockIterator = new MockCoReaderIterator(inDeserializer1, 
inDeserializer2);
 
@@ -154,7 +155,7 @@ public class MockCoContext<IN1, IN2, OUT> implements 
StreamTaskContext<OUT> {
        public static <IN1, IN2, OUT> List<OUT> 
createAndExecute(CoInvokable<IN1, IN2, OUT> invokable,
                        List<IN1> input1, List<IN2> input2) {
                MockCoContext<IN1, IN2, OUT> mockContext = new 
MockCoContext<IN1, IN2, OUT>(input1, input2);
-               invokable.setup(mockContext);
+               invokable.setup(mockContext, new ExecutionConfig());
 
                try {
                        invokable.open(null);

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
index 5537052..81467dc 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.StreamConfig;
@@ -49,7 +50,7 @@ public class MockContext<IN, OUT> implements 
StreamTaskContext<OUT> {
                }
 
                TypeInformation<IN> inTypeInfo = 
TypeExtractor.getForObject(inputs.iterator().next());
-               inDeserializer = new StreamRecordSerializer<IN>(inTypeInfo);
+               inDeserializer = new StreamRecordSerializer<IN>(inTypeInfo, new 
ExecutionConfig());
 
                iterator = new MockInputIterator();
                outputs = new ArrayList<OUT>();
@@ -104,7 +105,7 @@ public class MockContext<IN, OUT> implements 
StreamTaskContext<OUT> {
        public static <IN, OUT> List<OUT> createAndExecute(StreamInvokable<IN, 
OUT> invokable,
                        List<IN> inputs) {
                MockContext<IN, OUT> mockContext = new MockContext<IN, 
OUT>(inputs);
-               invokable.setup(mockContext);
+               invokable.setup(mockContext, new ExecutionConfig());
                try {
                        invokable.open(null);
                        invokable.invoke();

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 177a9ee..23495a5 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.scala
 
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
 import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream,
   SingleOutputStreamOperator, GroupedDataStream}
 import scala.reflect.ClassTag
@@ -41,7 +42,6 @@ import org.apache.flink.streaming.api.collector.OutputSelector
 import scala.collection.JavaConversions._
 import java.util.HashMap
 import org.apache.flink.streaming.api.function.aggregation.SumFunction
-import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction
 import 
org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
@@ -290,7 +290,9 @@ class DataStream[T](javaStream: JavaStream[T]) {
     val jStream = javaStream.asInstanceOf[JavaStream[Product]]
     val outType = jStream.getType().asInstanceOf[TupleTypeInfoBase[_]]
 
-    val agg = new 
ScalaStreamingAggregator[Product](jStream.getType().createSerializer(), 
position)
+    val agg = new ScalaStreamingAggregator[Product](
+      
jStream.getType().createSerializer(javaStream.getExecutionEnvironment.getConfig),
+      position)
 
     val reducer = aggregationType match {
       case AggregationType.SUM => new 
agg.Sum(SumFunction.getForClass(outType.getTypeAt(position).

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
index a408ec0..06271fd 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
@@ -18,6 +18,8 @@
 
 package org.apache.flink.streaming.api.scala
 
+import org.apache.flink.api.common.ExecutionConfig
+
 import scala.reflect.ClassTag
 import org.apache.commons.lang.Validate
 import org.apache.flink.api.common.functions.CrossFunction
@@ -44,10 +46,10 @@ class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: 
JavaStream[I2]) extend
 
       classOf[(I1, I2)], Seq(input1.getType, input2.getType), Array("_1", 
"_2")) {
 
-      override def createSerializer: TypeSerializer[(I1, I2)] = {
+      override def createSerializer(executionConfig: ExecutionConfig): 
TypeSerializer[(I1, I2)] = {
         val fieldSerializers: Array[TypeSerializer[_]] = new 
Array[TypeSerializer[_]](getArity)
         for (i <- 0 until getArity) {
-          fieldSerializers(i) = types(i).createSerializer
+          fieldSerializers(i) = types(i).createSerializer(executionConfig)
         }
 
         new CaseClassSerializer[(I1, I2)](classOf[(I1, I2)], fieldSerializers) 
{

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 394673c..bc9b422 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -18,6 +18,9 @@
 
 package org.apache.flink.streaming.api.scala
 
+import com.esotericsoftware.kryo.Serializer
+import org.apache.flink.api.java.typeutils.runtime.KryoSerializer
+
 import scala.reflect.ClassTag
 import org.apache.commons.lang.Validate
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -73,6 +76,33 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
   def getBufferTimout: Long = javaEnv.getBufferTimeout()
 
   /**
+   * Registers the given Serializer as a default serializer for the given 
class at the
+   * [[KryoSerializer]].
+   */
+  def registerKryoSerializer(clazz: Class[_], serializer: Serializer[_]): Unit 
= {
+    javaEnv.registerKryoSerializer(clazz, serializer)
+  }
+
+  /**
+   * Registers the given Serializer as a default serializer for the given 
class at the
+   * [[KryoSerializer]]
+   */
+  def registerKryoSerializer(clazz: Class[_], serializer: Class[_ <: 
Serializer[_]]) {
+    javaEnv.registerKryoSerializer(clazz, serializer)
+  }
+
+  /**
+   * Registers the given type with the serialization stack. If the type is 
eventually
+   * serialized as a POJO, then the type is registered with the POJO 
serializer. If the
+   * type ends up being serialized with Kryo, then it will be registered at 
Kryo to make
+   * sure that only tags are written.
+   *
+   */
+  def registerType(typeClass: Class[_]) {
+    javaEnv.registerType(typeClass)
+  }
+
+  /**
    * Creates a DataStream that represents the Strings produced by reading the
    * given file line wise. The file will be read with the system's default
    * character set.

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
index 35a94cd..67f7aae 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
@@ -18,6 +18,8 @@
 
 package org.apache.flink.streaming.api.scala
 
+import org.apache.flink.api.common.ExecutionConfig
+
 import scala.Array.canBuildFrom
 import scala.reflect.ClassTag
 import org.apache.commons.lang.Validate
@@ -46,7 +48,7 @@ TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, 
I2]](i1, i2) {
 
 object StreamJoinOperator {
 
-  class JoinWindow[I1, I2](private[flink]op: StreamJoinOperator[I1, I2]) 
extends 
+  class JoinWindow[I1, I2](private[flink] val op: StreamJoinOperator[I1, I2]) 
extends
   TemporalWindow[JoinWindow[I1, I2]] {
 
     private[flink] val type1 = op.input1.getType()
@@ -59,7 +61,9 @@ object StreamJoinOperator {
      */
     def where(fields: Int*) = {
       new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys(
-        new Keys.ExpressionKeys(fields.toArray, type1), type1))
+        new Keys.ExpressionKeys(fields.toArray, type1),
+        type1,
+        op.input1.getExecutionEnvironment.getConfig))
     }
 
     /**
@@ -70,7 +74,9 @@ object StreamJoinOperator {
      */
     def where(firstField: String, otherFields: String*) =
       new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys(
-        new Keys.ExpressionKeys(firstField +: otherFields.toArray, type1), 
type1))
+        new Keys.ExpressionKeys(firstField +: otherFields.toArray, type1),
+        type1,
+        op.input1.getExecutionEnvironment.getConfig))
 
     /**
      * Continues a temporal Join transformation by defining
@@ -112,7 +118,9 @@ object StreamJoinOperator {
      */
     def equalTo(fields: Int*): JoinedStream[I1, I2] = {
       finish(KeySelectorUtil.getSelectorForKeys(
-        new Keys.ExpressionKeys(fields.toArray, type2), type2))
+        new Keys.ExpressionKeys(fields.toArray, type2),
+        type2,
+        op.input1.getExecutionEnvironment.getConfig))
     }
 
     /**
@@ -123,7 +131,9 @@ object StreamJoinOperator {
      */
     def equalTo(firstField: String, otherFields: String*): JoinedStream[I1, 
I2] =
       finish(KeySelectorUtil.getSelectorForKeys(
-        new Keys.ExpressionKeys(firstField +: otherFields.toArray, type2), 
type2))
+        new Keys.ExpressionKeys(firstField +: otherFields.toArray, type2),
+        type2,
+        op.input1.getExecutionEnvironment.getConfig))
 
     /**
      * Creates a temporal join transformation by defining the second join key.
@@ -151,10 +161,11 @@ object StreamJoinOperator {
 
         classOf[(I1, I2)], Seq(op.input1.getType, op.input2.getType), 
Array("_1", "_2")) {
 
-        override def createSerializer: TypeSerializer[(I1, I2)] = {
+        override def createSerializer(
+            executionConfig: ExecutionConfig): TypeSerializer[(I1, I2)] = {
           val fieldSerializers: Array[TypeSerializer[_]] = new 
Array[TypeSerializer[_]](getArity)
           for (i <- 0 until getArity) {
-            fieldSerializers(i) = types(i).createSerializer
+            fieldSerializers(i) = types(i).createSerializer(executionConfig)
           }
 
           new CaseClassSerializer[(I1, I2)](classOf[(I1, I2)], 
fieldSerializers) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
index 5c734bf..33bbc67 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
@@ -18,6 +18,8 @@
 
 package org.apache.flink.streaming.api.scala
 
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+
 import scala.Array.canBuildFrom
 import scala.collection.JavaConversions.iterableAsScalaIterable
 import scala.reflect.ClassTag
@@ -26,7 +28,6 @@ import 
org.apache.flink.api.common.functions.GroupReduceFunction
 import org.apache.flink.api.common.functions.ReduceFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
 import org.apache.flink.api.streaming.scala.ScalaStreamingAggregator
 import org.apache.flink.streaming.api.datastream.{WindowedDataStream => 
JavaWStream}
 import 
org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType
@@ -234,7 +235,10 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
     val jStream = javaStream.asInstanceOf[JavaWStream[Product]]
     val outType = jStream.getType().asInstanceOf[TupleTypeInfoBase[_]]
 
-    val agg = new 
ScalaStreamingAggregator[Product](jStream.getType().createSerializer(), 
position)
+    val agg = new ScalaStreamingAggregator[Product](
+      jStream.getType().createSerializer(
+        javaStream.getDataStream.getExecutionEnvironment.getConfig),
+      position)
 
     val reducer = aggregationType match {
       case AggregationType.SUM => new agg.Sum(SumFunction.getForClass(

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountSubclassInterfacePOJOITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountSubclassInterfacePOJOITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountSubclassInterfacePOJOITCase.java
new file mode 100644
index 0000000..05ffc88
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountSubclassInterfacePOJOITCase.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.exampleJavaPrograms;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+@SuppressWarnings("serial")
+public class WordCountSubclassInterfacePOJOITCase extends JavaProgramTestBase 
implements Serializable {
+       private static final long serialVersionUID = 1L;
+       protected String textPath;
+       protected String resultPath;
+
+
+       @Override
+       protected void preSubmit() throws Exception {
+               textPath = createTempFile("text.txt", WordCountData.TEXT);
+               resultPath = getTempDirPath("result");
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath);
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<String> text = env.readTextFile(textPath);
+
+               DataSet<WCBase> counts = text
+                               .flatMap(new Tokenizer())
+                               .groupBy("word")
+                               .reduce(new ReduceFunction<WCBase>() {
+                                       private static final long 
serialVersionUID = 1L;
+                                       public WCBase reduce(WCBase value1, 
WCBase value2) {
+                                               WC wc1 = (WC) value1;
+                                               WC wc2 = (WC) value2;
+                                               int c = 
wc1.secretCount.getCount() + wc2.secretCount.getCount();
+                                               wc1.secretCount.setCount(c);
+                                               return wc1;
+                                       }
+                               })
+                               .map(new MapFunction<WCBase, WCBase>() {
+                                       @Override
+                                       public WCBase map(WCBase value) throws 
Exception {
+                                               WC wc = (WC) value;
+                                               wc.count = 
wc.secretCount.getCount();
+                                               return wc;
+                                       }
+                               });
+
+               counts.writeAsText(resultPath);
+
+               env.execute("WordCount with custom data types example");
+       }
+
+       public static final class Tokenizer implements FlatMapFunction<String, 
WCBase> {
+
+               @Override
+               public void flatMap(String value, Collector<WCBase> out) {
+                       // normalize and split the line
+                       String[] tokens = value.toLowerCase().split("\\W+");
+                       // emit the pairs
+                       for (String token : tokens) {
+                               if (token.length() > 0) {
+                                       out.collect(new WC(token, 1));
+                               }
+                       }
+               }
+       }
+
+       public static abstract class WCBase {
+               public String word;
+               public int count;
+
+               public WCBase(String w, int c) {
+                       this.word = w;
+                       this.count = c;
+               }
+               @Override
+               public String toString() {
+                       return word+" "+count;
+               }
+       }
+
+       public static interface CrazyCounter {
+               public int getCount();
+               public void setCount(int c);
+       }
+
+       public static class CrazyCounterImpl implements CrazyCounter {
+               public int countz;
+
+               public CrazyCounterImpl() {
+               }
+
+               public CrazyCounterImpl(int c) {
+                       this.countz = c;
+               }
+
+               @Override
+               public int getCount() {
+                       return countz;
+               }
+
+               @Override
+               public void setCount(int c) {
+                       this.countz = c;
+               }
+
+       }
+
+       public static class WC extends WCBase {
+               public CrazyCounter secretCount;
+
+               public WC() {
+                       super(null, 0);
+               }
+
+               public WC(String w, int c) {
+                       super(w, 0);
+                       this.secretCount = new CrazyCounterImpl(c);
+               }
+
+       }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountSubclassPOJOITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountSubclassPOJOITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountSubclassPOJOITCase.java
new file mode 100644
index 0000000..f74ee16
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountSubclassPOJOITCase.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.exampleJavaPrograms;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+@SuppressWarnings("serial")
+public class WordCountSubclassPOJOITCase extends JavaProgramTestBase 
implements Serializable {
+       private static final long serialVersionUID = 1L;
+       protected String textPath;
+       protected String resultPath;
+
+       
+       @Override
+       protected void preSubmit() throws Exception {
+               textPath = createTempFile("text.txt", WordCountData.TEXT);
+               resultPath = getTempDirPath("result");
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath);
+       }
+       
+       @Override
+       protected void testProgram() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<String> text = env.readTextFile(textPath);
+
+               DataSet<WCBase> counts = text
+                               .flatMap(new Tokenizer())
+                               .groupBy("word")
+                               .reduce(new ReduceFunction<WCBase>() {
+                                       private static final long 
serialVersionUID = 1L;
+                                       public WCBase reduce(WCBase value1, 
WCBase value2) {
+                                               WC wc1 = (WC) value1;
+                                               WC wc2 = (WC) value2;
+                                               return new WC(value1.word, 
wc1.secretCount + wc2.secretCount);
+                                       }
+                               })
+                               .map(new MapFunction<WCBase, WCBase>() {
+                                       @Override
+                                       public WCBase map(WCBase value) throws 
Exception {
+                                               WC wc = (WC) value;
+                                               wc.count = wc.secretCount;
+                                               return wc;
+                                       }
+                               });
+
+               counts.writeAsText(resultPath);
+
+               env.execute("WordCount with custom data types example");
+       }
+
+       public static final class Tokenizer implements FlatMapFunction<String, 
WCBase> {
+
+               @Override
+               public void flatMap(String value, Collector<WCBase> out) {
+                       // normalize and split the line
+                       String[] tokens = value.toLowerCase().split("\\W+");
+                       // emit the pairs
+                       for (String token : tokens) {
+                               if (token.length() > 0) {
+                                       out.collect(new WC(token, 1));
+                               }
+                       }
+               }
+       }
+
+       public static abstract class WCBase {
+               public String word;
+               public int count;
+
+               public WCBase(String w, int c) {
+                       this.word = w;
+                       this.count = c;
+               }
+               @Override
+               public String toString() {
+                       return word+" "+count;
+               }
+       }
+
+       public static class WC extends WCBase {
+
+               public int secretCount;
+
+               public WC() {
+                       super(null, 0);
+               }
+
+               public WC(String w, int c) {
+                       super(w, 0);
+                       this.secretCount = c;
+               }
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala
index 0a0e50d..84a0032 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.api.scala.io
 
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.java.io.CollectionInputFormat
 import org.junit.Assert.assertEquals
 import org.junit.Assert.assertNotNull
@@ -55,8 +56,11 @@ class CollectionInputFormatTest {
     val inputCollection = Seq(new ElementType(1), new ElementType(2), new 
ElementType(3))
     val info = createTypeInformation[ElementType]
 
-    val inputFormat: CollectionInputFormat[ElementType] = new
-        CollectionInputFormat[ElementType](inputCollection.asJava, 
info.createSerializer())
+    val inputFormat: CollectionInputFormat[ElementType] = {
+      new CollectionInputFormat[ElementType](
+        inputCollection.asJava,
+        info.createSerializer(new ExecutionConfig))
+    }
 
     val buffer = new ByteArrayOutputStream
     val out = new ObjectOutputStream(buffer)
@@ -107,7 +111,7 @@ class CollectionInputFormatTest {
 
     val inputFormat = new CollectionInputFormat[String](
       data.asJava,
-      BasicTypeInfo.STRING_TYPE_INFO.createSerializer)
+      BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig))
     val baos = new ByteArrayOutputStream
     val oos = new ObjectOutputStream(baos)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
index 4f8816f..157aa0d 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
@@ -22,6 +22,7 @@ import java.io.File
 import java.util.Random
 import java.io.BufferedWriter
 import java.io.FileWriter
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.scala._
 import java.io.BufferedReader
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync
@@ -81,8 +82,12 @@ class MassiveCaseClassSortingITCase {
         val typeInfo = implicitly[TypeInformation[StringTuple]]
           .asInstanceOf[CompositeType[StringTuple]]
         
-        val serializer = typeInfo.createSerializer()
-        val comparator = typeInfo.createComparator(Array(0, 1), Array(true, 
true), 0)
+        val serializer = typeInfo.createSerializer(new ExecutionConfig)
+        val comparator = typeInfo.createComparator(
+          Array(0, 1),
+          Array(true, true),
+          0,
+          new ExecutionConfig)
         
         val mm = new DefaultMemoryManager(1024 * 1024, 1)
         val ioMan = new IOManagerAsync()

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
index 4718395..21c6581 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.scala.runtime
 
+import org.apache.flink.api.common.ExecutionConfig
 import org.junit.Test
 import org.junit.Assert._
 import org.apache.flink.api.scala._
@@ -48,9 +49,13 @@ class CaseClassComparatorTest {
       val typeInfo = implicitly[TypeInformation[CaseTestClass]]
                                      
.asInstanceOf[CompositeType[CaseTestClass]]
       
-      val serializer = typeInfo.createSerializer()
+      val serializer = typeInfo.createSerializer(new ExecutionConfig)
       val comparator = new FailingCompareDeserializedWrapper(
-          typeInfo.createComparator(Array[Int](0, 2), Array[Boolean](true, 
true), 0))
+          typeInfo.createComparator(
+            Array[Int](0, 2),
+            Array[Boolean](true, true),
+            0,
+            new ExecutionConfig))
       
       assertTrue(comparator.supportsNormalizedKey())
       assertEquals(8, comparator.getNormalizeKeyLen())

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
index c396f9f..ce4efb3 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
@@ -17,13 +17,15 @@
  */
 package org.apache.flink.api.scala.runtime
 
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeutils.SerializerTestInstance
+import org.apache.flink.api.java.ExecutionEnvironment
 import org.apache.flink.api.java.typeutils.GenericTypeInfo
+import org.apache.flink.api.java.typeutils.runtime.KryoSerializer
 import org.joda.time.DateTime
 import org.junit.Test
 import scala.reflect._
 import org.joda.time.LocalDate
-import org.apache.flink.api.java.typeutils.runtime.KryoSerializer
 import com.esotericsoftware.kryo.Serializer
 import com.esotericsoftware.kryo.Kryo
 import com.esotericsoftware.kryo.io.Output
@@ -94,8 +96,6 @@ class KryoGenericTypeSerializerTest {
   def jodaSerialization: Unit = {
     val a = List(new LocalDate(1), new LocalDate(2))
     
-    KryoSerializer.registerSerializer(classOf[LocalDate], new 
LocalDateSerializer())
-
     runTests(a)
   }
 
@@ -191,8 +191,13 @@ class KryoGenericTypeSerializerTest {
 
   def runTests[T : ClassTag](objects: Seq[T]): Unit ={
     val clsTag = classTag[T]
+
+
+    // Register the custom Kryo Serializer
+    val conf = new ExecutionConfig
+    conf.registerKryoSerializer(classOf[LocalDate], 
classOf[LocalDateSerializer])
     val typeInfo = new 
GenericTypeInfo[T](clsTag.runtimeClass.asInstanceOf[Class[T]])
-    val serializer = typeInfo.createSerializer()
+    val serializer = typeInfo.createSerializer(conf)
     val typeClass = typeInfo.getTypeClass
 
     val instance = new SerializerTestInstance[T](serializer, typeClass, -1, 
objects: _*)

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
index fc51c0c..c86fde0 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.api.scala.runtime
 
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.java.typeutils.runtime.KryoSerializer
 import org.junit.Assert._
 
@@ -90,7 +91,7 @@ class ScalaSpecialTypesSerializerTest {
   private final def runTests[T : TypeInformation](instances: Array[T]) {
     try {
       val typeInfo = implicitly[TypeInformation[T]]
-      val serializer = typeInfo.createSerializer
+      val serializer = typeInfo.createSerializer(new ExecutionConfig)
       val typeClass = typeInfo.getTypeClass
       val test =
         new ScalaSpecialTypesSerializerTestInstance[T](serializer, typeClass, 
-1, instances)
@@ -116,8 +117,9 @@ class ScalaSpecialTypesSerializerTestInstance[T](
   override def testInstantiate(): Unit = {
     try {
       val serializer: TypeSerializer[T] = getSerializer
-      val instance: T = serializer.createInstance
       if (!serializer.isInstanceOf[KryoSerializer[_]]) {
+        // kryo serializer does return null, so only test for 
non-kryo-serializers
+        val instance: T = serializer.createInstance
         assertNotNull("The created instance must not be null.", instance)
       }
       val tpe: Class[T] = getTypeClass

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala
index 84ff4a6..65648b6 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.api.scala.runtime
 
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.functions.InvalidTypesException
 import org.junit.Assert._
 
@@ -104,7 +105,7 @@ class TraversableSerializerTest {
     val testData = Array(Array((1, "String"), (2, "Foo")), Array((4, 
"String"), (3, "Foo")))
     runTests(testData)
   }
-//
+
   @Test
   def testWithCaseClass(): Unit = {
     val testData = Array(Seq((1, "String"), (2, "Foo")), Seq((4, "String"), 
(3, "Foo")))
@@ -132,7 +133,7 @@ class TraversableSerializerTest {
   private final def runTests[T : TypeInformation](instances: Array[T]) {
     try {
       val typeInfo = implicitly[TypeInformation[T]]
-      val serializer = typeInfo.createSerializer
+      val serializer = typeInfo.createSerializer(new ExecutionConfig)
       val typeClass = typeInfo.getTypeClass
       val test =
         new ScalaSpecialTypesSerializerTestInstance[T](serializer, typeClass, 
-1, instances)

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD2Test.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD2Test.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD2Test.scala
index 8ccbc83..8b1a180 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD2Test.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD2Test.scala
@@ -17,8 +17,8 @@
  */
 package org.apache.flink.api.scala.runtime
 
-import org.apache.flink.api.common.typeutils.TypeComparator
-import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
 import org.apache.flink.api.scala.runtime.tuple.base.TupleComparatorTestBase
 
@@ -30,12 +30,12 @@ class TupleComparatorILD2Test extends 
TupleComparatorTestBase[(Int, Long, Double
   protected def createComparator(ascending: Boolean): TypeComparator[(Int, 
Long, Double)] = {
     val ti = createTypeInformation[(Int, Long, Double)]
     ti.asInstanceOf[TupleTypeInfoBase[(Int, Long, Double)]]
-      .createComparator(Array(0, 1), Array(ascending, ascending), 0)
+      .createComparator(Array(0, 1), Array(ascending, ascending), 0, new 
ExecutionConfig)
   }
 
   protected def createSerializer: TypeSerializer[(Int, Long, Double)] = {
     val ti = createTypeInformation[(Int, Long, Double)]
-    ti.createSerializer()
+    ti.createSerializer(new ExecutionConfig)
   }
 
   protected def getSortedTestData: Array[(Int, Long, Double)] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD3Test.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD3Test.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD3Test.scala
index d329571..2fdc087 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD3Test.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD3Test.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.api.scala.runtime
 
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
 import org.apache.flink.api.scala._
@@ -28,12 +29,16 @@ class TupleComparatorILD3Test extends 
TupleComparatorTestBase[(Int, Long, Double
   protected def createComparator(ascending: Boolean): TypeComparator[(Int, 
Long, Double)] = {
     val ti = createTypeInformation[(Int, Long, Double)]
     ti.asInstanceOf[TupleTypeInfoBase[(Int, Long, Double)]]
-      .createComparator(Array(0, 1, 2), Array(ascending, ascending, 
ascending), 0)
+      .createComparator(
+        Array(0, 1, 2),
+        Array(ascending, ascending, ascending),
+        0,
+        new ExecutionConfig)
   }
 
   protected def createSerializer: TypeSerializer[(Int, Long, Double)] = {
     val ti = createTypeInformation[(Int, Long, Double)]
-    ti.createSerializer()
+    ti.createSerializer(new ExecutionConfig)
   }
 
   protected def getSortedTestData: Array[(Int, Long, Double)] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDC3Test.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDC3Test.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDC3Test.scala
index de7affd..34e0306 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDC3Test.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDC3Test.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.api.scala.runtime
 
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeComparator}
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
 import org.apache.flink.api.scala._
@@ -28,12 +29,16 @@ class TupleComparatorILDC3Test extends 
TupleComparatorTestBase[(Int, Long, Doubl
   protected def createComparator(ascending: Boolean): TypeComparator[(Int, 
Long, Double)] = {
     val ti = createTypeInformation[(Int, Long, Double)]
     ti.asInstanceOf[TupleTypeInfoBase[(Int, Long, Double)]]
-      .createComparator(Array(2, 0, 1), Array(ascending, ascending, 
ascending), 0)
+      .createComparator(
+        Array(2, 0, 1),
+        Array(ascending, ascending, ascending),
+        0,
+        new ExecutionConfig)
   }
 
   protected def createSerializer: TypeSerializer[(Int, Long, Double)] = {
     val ti = createTypeInformation[(Int, Long, Double)]
-    ti.createSerializer()
+    ti.createSerializer(new ExecutionConfig)
   }
 
   protected def getSortedTestData: Array[(Int, Long, Double)] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDX1Test.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDX1Test.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDX1Test.scala
index eebd56f..27d8296 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDX1Test.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDX1Test.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.api.scala.runtime
 
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeComparator}
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
 import org.apache.flink.api.scala.runtime.tuple.base.TupleComparatorTestBase
@@ -28,12 +29,12 @@ class TupleComparatorILDX1Test extends 
TupleComparatorTestBase[(Int, Long, Doubl
   protected def createComparator(ascending: Boolean): TypeComparator[(Int, 
Long, Double)] = {
     val ti = createTypeInformation[(Int, Long, Double)]
     ti.asInstanceOf[TupleTypeInfoBase[(Int, Long, Double)]]
-      .createComparator(Array(1), Array(ascending), 0)
+      .createComparator(Array(1), Array(ascending), 0, new ExecutionConfig)
   }
 
   protected def createSerializer: TypeSerializer[(Int, Long, Double)] = {
     val ti = createTypeInformation[(Int, Long, Double)]
-    ti.createSerializer()
+    ti.createSerializer(new ExecutionConfig)
   }
 
   protected def getSortedTestData: Array[(Int, Long, Double)] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDXC2Test.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDXC2Test.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDXC2Test.scala
index c83ab8b..8231d46 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDXC2Test.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDXC2Test.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.api.scala.runtime
 
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeComparator}
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
 import org.apache.flink.api.scala.runtime.tuple.base.TupleComparatorTestBase
@@ -28,12 +29,12 @@ class TupleComparatorILDXC2Test extends 
TupleComparatorTestBase[(Int, Long, Doub
   protected def createComparator(ascending: Boolean): TypeComparator[(Int, 
Long, Double)] = {
     val ti = createTypeInformation[(Int, Long, Double)]
     ti.asInstanceOf[TupleTypeInfoBase[(Int, Long, Double)]]
-      .createComparator(Array(2, 1), Array(ascending, ascending), 0)
+      .createComparator(Array(2, 1), Array(ascending, ascending), 0, new 
ExecutionConfig)
   }
 
   protected def createSerializer: TypeSerializer[(Int, Long, Double)] = {
     val ti = createTypeInformation[(Int, Long, Double)]
-    ti.createSerializer()
+    ti.createSerializer(new ExecutionConfig)
   }
 
   protected def getSortedTestData: Array[(Int, Long, Double)] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD1Test.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD1Test.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD1Test.scala
index 03c9666..1e1399e 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD1Test.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD1Test.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.api.scala.runtime
 
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeComparator}
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
 import org.apache.flink.api.scala._
@@ -27,12 +28,12 @@ class TupleComparatorISD1Test extends 
TupleComparatorTestBase[(Int, String, Doub
   protected def createComparator(ascending: Boolean): TypeComparator[(Int, 
String, Double)] = {
     val ti = createTypeInformation[(Int, String, Double)]
     ti.asInstanceOf[TupleTypeInfoBase[(Int, String, Double)]]
-      .createComparator(Array(0), Array(ascending),0)
+      .createComparator(Array(0), Array(ascending), 0, new ExecutionConfig)
   }
 
   protected def createSerializer: TypeSerializer[(Int, String, Double)] = {
     val ti = createTypeInformation[(Int, String, Double)]
-    ti.createSerializer()
+    ti.createSerializer(new ExecutionConfig)
   }
 
   protected def getSortedTestData: Array[(Int, String, Double)] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD2Test.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD2Test.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD2Test.scala
index 9a3b9f9..eb905bd 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD2Test.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD2Test.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.api.scala.runtime
 
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
 import org.apache.flink.api.scala._
@@ -27,12 +28,12 @@ class TupleComparatorISD2Test extends 
TupleComparatorTestBase[(Int, String, Doub
   protected def createComparator(ascending: Boolean): TypeComparator[(Int, 
String, Double)] = {
     val ti = createTypeInformation[(Int, String, Double)]
     ti.asInstanceOf[TupleTypeInfoBase[(Int, String, Double)]]
-      .createComparator(Array(0, 1), Array(ascending, ascending), 0)
+      .createComparator(Array(0, 1), Array(ascending, ascending), 0, new 
ExecutionConfig)
   }
 
   protected def createSerializer: TypeSerializer[(Int, String, Double)] = {
     val ti = createTypeInformation[(Int, String, Double)]
-    ti.createSerializer()
+    ti.createSerializer(new ExecutionConfig)
   }
 
   protected def getSortedTestData: Array[(Int, String, Double)] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD3Test.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD3Test.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD3Test.scala
index 01b4f3e..d7ff16a 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD3Test.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD3Test.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.api.scala.runtime
 
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeComparator}
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
 import org.apache.flink.api.scala._
@@ -27,12 +28,16 @@ class TupleComparatorISD3Test extends 
TupleComparatorTestBase[(Int, String, Doub
   protected def createComparator(ascending: Boolean): TypeComparator[(Int, 
String, Double)] = {
     val ti = createTypeInformation[(Int, String, Double)]
     ti.asInstanceOf[TupleTypeInfoBase[(Int, String, Double)]]
-      .createComparator(Array(0, 1, 2), Array(ascending, ascending, 
ascending), 0)
+      .createComparator(
+        Array(0, 1, 2),
+        Array(ascending, ascending, ascending),
+        0,
+        new ExecutionConfig)
   }
 
   protected def createSerializer: TypeSerializer[(Int, String, Double)] = {
     val ti = createTypeInformation[(Int, String, Double)]
-    ti.createSerializer()
+    ti.createSerializer(new ExecutionConfig)
   }
 
   protected def getSortedTestData: Array[(Int, String, Double)] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
index 29e13ec..9371604 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
@@ -18,6 +18,8 @@
 package org.apache.flink.api.scala.runtime
 
 import java.util
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.java.ExecutionEnvironment
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
 import 
org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest._
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -28,7 +30,6 @@ import org.junit.Test
 import org.apache.flink.api.scala._
 import scala.collection.JavaConverters._
 import java.util.Random
-import org.apache.flink.api.java.typeutils.runtime.KryoSerializer
 
 class TupleSerializerTest {
 
@@ -102,8 +103,6 @@ class TupleSerializerTest {
       (StringUtils.getRandomString(rnd, 10, 100), new LocalDate(rnd.nextInt)),
       (StringUtils.getRandomString(rnd, 10, 100), new LocalDate(rnd.nextInt)))
       
-    KryoSerializer.registerSerializer(classOf[LocalDate], new 
LocalDateSerializer())
-    
     runTests(testTuples)
   }
 
@@ -192,8 +191,11 @@ class TupleSerializerTest {
 
   private final def runTests[T <: Product : TypeInformation](instances: 
Array[T]) {
     try {
+      // Register the custom Kryo Serializer
+      val conf = new ExecutionConfig
+      conf.registerKryoSerializer(classOf[LocalDate], 
classOf[LocalDateSerializer])
       val tupleTypeInfo = 
implicitly[TypeInformation[T]].asInstanceOf[TupleTypeInfoBase[T]]
-      val serializer = tupleTypeInfo.createSerializer
+      val serializer = tupleTypeInfo.createSerializer(conf)
       val tupleClass = tupleTypeInfo.getTypeClass
       val test = new TupleSerializerTestInstance[T](serializer, tupleClass, 
-1, instances)
       test.testAll()

http://git-wip-us.apache.org/repos/asf/flink/blob/7407076d/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
index 89d7c5e..08ba49d 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
@@ -20,6 +20,7 @@ package org.apache.flink.api.scala.types
 import java.io.DataInput
 import java.io.DataOutput
 import org.apache.flink.api.common.typeinfo._
+import org.apache.flink.api.common.typeutils._
 import org.apache.flink.api.java.typeutils._
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.types.{IntValue, StringValue}

Reply via email to