[FLINK-1539] [streaming] Remove calls to uninitalized runtimecontexts

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/44702075
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/44702075
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/44702075

Branch: refs/heads/master
Commit: 4470207501ff318bd4d59bad58a5f3fc4ccbfa6f
Parents: bb5dc7e
Author: Gyula Fora <gyf...@apache.org>
Authored: Fri Feb 13 19:57:55 2015 +0100
Committer: mbalassi <mbala...@apache.org>
Committed: Mon Feb 16 13:06:08 2015 +0100

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    | 11 ++++----
 .../api/datastream/WindowedDataStream.java      | 12 ++++-----
 .../aggregation/ComparableAggregator.java       | 14 +++++-----
 .../api/function/aggregation/SumAggregator.java | 10 +++++---
 .../api/function/source/FileSourceFunction.java | 15 +++--------
 .../api/invokable/StreamInvokable.java          |  5 ++--
 .../invokable/operator/ProjectInvokable.java    |  4 ++-
 .../api/invokable/operator/co/CoInvokable.java  |  3 +--
 .../api/streamvertex/CoStreamVertex.java        |  2 +-
 .../api/streamvertex/StreamTaskContext.java     |  5 +++-
 .../api/streamvertex/StreamVertex.java          |  2 +-
 .../windowing/windowbuffer/WindowBuffer.java    |  4 +--
 .../flink/streaming/util/MockCoContext.java     |  7 ++++-
 .../flink/streaming/util/MockContext.java       |  7 ++++-
 .../src/test/resources/log4j-test.properties    | 10 +++++++-
 .../src/test/resources/log4j.properties         | 27 ++++++++++++++++++++
 16 files changed, 92 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index e766626..71a97f8 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -636,7 +636,8 @@ public class DataStream<OUT> {
         * @return The transformed DataStream.
         */
        public SingleOutputStreamOperator<OUT, ?> sum(String field) {
-               return aggregate((AggregationFunction<OUT>) 
SumAggregator.getSumFunction(field, getType()));
+               return aggregate((AggregationFunction<OUT>) 
SumAggregator.getSumFunction(field, getType(),
+                               getExecutionConfig()));
        }
 
        /**
@@ -667,7 +668,7 @@ public class DataStream<OUT> {
         */
        public SingleOutputStreamOperator<OUT, ?> min(String field) {
                return aggregate(ComparableAggregator.getAggregator(field, 
getType(), AggregationType.MIN,
-                               false));
+                               false, getExecutionConfig()));
        }
 
        /**
@@ -698,7 +699,7 @@ public class DataStream<OUT> {
         */
        public SingleOutputStreamOperator<OUT, ?> max(String field) {
                return aggregate(ComparableAggregator.getAggregator(field, 
getType(), AggregationType.MAX,
-                               false));
+                               false, getExecutionConfig()));
        }
 
        /**
@@ -718,7 +719,7 @@ public class DataStream<OUT> {
         */
        public SingleOutputStreamOperator<OUT, ?> minBy(String field, boolean 
first) {
                return aggregate(ComparableAggregator.getAggregator(field, 
getType(),
-                               AggregationType.MINBY, first));
+                               AggregationType.MINBY, first, 
getExecutionConfig()));
        }
 
        /**
@@ -738,7 +739,7 @@ public class DataStream<OUT> {
         */
        public SingleOutputStreamOperator<OUT, ?> maxBy(String field, boolean 
first) {
                return aggregate(ComparableAggregator.getAggregator(field, 
getType(),
-                               AggregationType.MAXBY, first));
+                               AggregationType.MAXBY, first, 
getExecutionConfig()));
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 3ff5859..4da12ac 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -381,7 +381,8 @@ public class WindowedDataStream<OUT> {
         * @return The transformed DataStream.
         */
        public WindowedDataStream<OUT> sum(String field) {
-               return aggregate((AggregationFunction<OUT>) 
SumAggregator.getSumFunction(field, getType()));
+               return aggregate((AggregationFunction<OUT>) 
SumAggregator.getSumFunction(field, getType(),
+                               getExecutionConfig()));
        }
 
        /**
@@ -411,7 +412,7 @@ public class WindowedDataStream<OUT> {
         */
        public WindowedDataStream<OUT> min(String field) {
                return aggregate(ComparableAggregator.getAggregator(field, 
getType(), AggregationType.MIN,
-                               false));
+                               false, getExecutionConfig()));
        }
 
        /**
@@ -475,7 +476,7 @@ public class WindowedDataStream<OUT> {
         */
        public WindowedDataStream<OUT> minBy(String field, boolean first) {
                return aggregate(ComparableAggregator.getAggregator(field, 
getType(),
-                               AggregationType.MINBY, first));
+                               AggregationType.MINBY, first, 
getExecutionConfig()));
        }
 
        /**
@@ -505,7 +506,7 @@ public class WindowedDataStream<OUT> {
         */
        public WindowedDataStream<OUT> max(String field) {
                return aggregate(ComparableAggregator.getAggregator(field, 
getType(), AggregationType.MAX,
-                               false));
+                               false, getExecutionConfig()));
        }
 
        /**
@@ -569,11 +570,10 @@ public class WindowedDataStream<OUT> {
         */
        public WindowedDataStream<OUT> maxBy(String field, boolean first) {
                return aggregate(ComparableAggregator.getAggregator(field, 
getType(),
-                               AggregationType.MAXBY, first));
+                               AggregationType.MAXBY, first, 
getExecutionConfig()));
        }
 
        private WindowedDataStream<OUT> aggregate(AggregationFunction<OUT> 
aggregator) {
-
                return reduceWindow(aggregator);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java
index 7f7cf0b..66da931 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java
@@ -21,6 +21,7 @@ import java.lang.reflect.Array;
 import java.lang.reflect.Field;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -65,9 +66,10 @@ public abstract class ComparableAggregator<T> extends 
AggregationFunction<T> {
        }
 
        public static <R> AggregationFunction<R> getAggregator(String field,
-                       TypeInformation<R> typeInfo, AggregationType 
aggregationType, boolean first) {
+                       TypeInformation<R> typeInfo, AggregationType 
aggregationType, boolean first,
+                       ExecutionConfig config) {
 
-               return new PojoComparableAggregator<R>(field, typeInfo, 
aggregationType, first);
+               return new PojoComparableAggregator<R>(field, typeInfo, 
aggregationType, first, config);
        }
 
        private static class TupleComparableAggregator<T> extends 
ComparableAggregator<T> {
@@ -177,7 +179,7 @@ public abstract class ComparableAggregator<T> extends 
AggregationFunction<T> {
                PojoComparator<T> pojoComparator;
 
                public PojoComparableAggregator(String field, 
TypeInformation<?> typeInfo,
-                               AggregationType aggregationType, boolean first) 
{
+                               AggregationType aggregationType, boolean first, 
ExecutionConfig config) {
                        super(0, aggregationType, first);
                        if (!(typeInfo instanceof CompositeType<?>)) {
                                throw new IllegalArgumentException(
@@ -193,7 +195,7 @@ public abstract class ComparableAggregator<T> extends 
AggregationFunction<T> {
 
                        if (cType instanceof PojoTypeInfo) {
                                pojoComparator = (PojoComparator<T>) 
cType.createComparator(
-                                               new int[] { logicalKeyPosition 
}, new boolean[] { false }, 0, getRuntimeContext().getExecutionConfig());
+                                               new int[] { logicalKeyPosition 
}, new boolean[] { false }, 0, config);
                        } else {
                                throw new IllegalArgumentException(
                                                "Key expressions are only 
supported on POJO types. "
@@ -225,8 +227,8 @@ public abstract class ComparableAggregator<T> extends 
AggregationFunction<T> {
                        } else {
                                if (c == 1) {
                                        keyFields[0].set(value2, field1);
-                               } 
-                               
+                               }
+
                                return value2;
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java
index 74e4597..20d4450 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java
@@ -21,6 +21,7 @@ import java.lang.reflect.Array;
 import java.lang.reflect.Field;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
@@ -47,9 +48,10 @@ public abstract class SumAggregator {
 
        }
 
-       public static <T> ReduceFunction<T> getSumFunction(String field, 
TypeInformation<T> typeInfo) {
+       public static <T> ReduceFunction<T> getSumFunction(String field, 
TypeInformation<T> typeInfo,
+                       ExecutionConfig config) {
 
-               return new PojoSumAggregator<T>(field, typeInfo);
+               return new PojoSumAggregator<T>(field, typeInfo, config);
        }
 
        private static class TupleSumAggregator<T> extends 
AggregationFunction<T> {
@@ -126,7 +128,7 @@ public abstract class SumAggregator {
                SumFunction adder;
                PojoComparator<T> comparator;
 
-               public PojoSumAggregator(String field, TypeInformation<?> type) 
{
+               public PojoSumAggregator(String field, TypeInformation<?> type, 
ExecutionConfig config) {
                        super(0);
                        if (!(type instanceof CompositeType<?>)) {
                                throw new IllegalArgumentException(
@@ -146,7 +148,7 @@ public abstract class SumAggregator {
 
                        if (cType instanceof PojoTypeInfo) {
                                comparator = (PojoComparator<T>) 
cType.createComparator(
-                                               new int[] { logicalKeyPosition 
}, new boolean[] { false }, 0, getRuntimeContext().getExecutionConfig());
+                                               new int[] { logicalKeyPosition 
}, new boolean[] { false }, 0, config);
                        } else {
                                throw new IllegalArgumentException(
                                                "Key expressions are only 
supported on POJO types. "

http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
index 6d1441a..20f5f56 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
@@ -23,8 +23,6 @@ import java.util.NoSuchElementException;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -38,17 +36,11 @@ public class FileSourceFunction extends 
RichSourceFunction<String> {
 
        private InputFormat<String, ?> inputFormat;
 
-       private TypeSerializerFactory<String> serializerFactory;
+       private TypeInformation<String> typeInfo;
 
        public FileSourceFunction(InputFormat<String, ?> format, 
TypeInformation<String> typeInfo) {
                this.inputFormat = format;
-               this.serializerFactory = createSerializer(typeInfo);
-       }
-
-       private TypeSerializerFactory<String> 
createSerializer(TypeInformation<String> typeInfo) {
-               TypeSerializer<String> serializer = 
typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
-
-               return new RuntimeSerializerFactory<String>(serializer, 
typeInfo.getTypeClass());
+               this.typeInfo = typeInfo;
        }
 
        @Override
@@ -60,7 +52,8 @@ public class FileSourceFunction extends 
RichSourceFunction<String> {
 
        @Override
        public void invoke(Collector<String> collector) throws Exception {
-               final TypeSerializer<String> serializer = 
serializerFactory.getSerializer();
+               final TypeSerializer<String> serializer = 
typeInfo.createSerializer(getRuntimeContext()
+                               .getExecutionConfig());
                final Iterator<InputSplit> splitIterator = getInputSplits();
                @SuppressWarnings("unchecked")
                final InputFormat<String, InputSplit> format = 
(InputFormat<String, InputSplit>) this.inputFormat;

http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
index 7feeac8..733edc7 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
@@ -73,9 +73,8 @@ public abstract class StreamInvokable<IN, OUT> implements 
Serializable {
         * 
         * @param taskContext
         *            StreamTaskContext representing the vertex
-        * @param executionConfig
         */
-       public void setup(StreamTaskContext<OUT> taskContext, ExecutionConfig 
executionConfig) {
+       public void setup(StreamTaskContext<OUT> taskContext) {
                this.collector = taskContext.getOutputCollector();
                this.recordIterator = taskContext.getIndexedInput(0);
                this.inSerializer = taskContext.getInputSerializer(0);
@@ -84,7 +83,7 @@ public abstract class StreamInvokable<IN, OUT> implements 
Serializable {
                        this.objectSerializer = 
inSerializer.getObjectSerializer();
                }
                this.taskContext = taskContext;
-               this.executionConfig = executionConfig;
+               this.executionConfig = taskContext.getExecutionConfig();
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
index 69c7cee..31689c7 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
@@ -28,6 +28,7 @@ public class ProjectInvokable<IN, OUT extends Tuple> extends 
StreamInvokable<IN,
 
        transient OUT outTuple;
        TypeSerializer<OUT> outTypeSerializer;
+       TypeInformation<OUT> outTypeInformation;
        int[] fields;
        int numFields;
 
@@ -35,7 +36,7 @@ public class ProjectInvokable<IN, OUT extends Tuple> extends 
StreamInvokable<IN,
                super(null);
                this.fields = fields;
                this.numFields = this.fields.length;
-               this.outTypeSerializer = 
outTypeInformation.createSerializer(executionConfig);
+               this.outTypeInformation = outTypeInformation;
        }
 
        @Override
@@ -56,6 +57,7 @@ public class ProjectInvokable<IN, OUT extends Tuple> extends 
StreamInvokable<IN,
        @Override
        public void open(Configuration config) throws Exception {
                super.open(config);
+               this.outTypeSerializer = 
outTypeInformation.createSerializer(executionConfig);
                outTuple = outTypeSerializer.createInstance();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index 9f98db3..604873e 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.api.invokable.operator.co;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
@@ -47,7 +46,7 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends 
StreamInvokable<IN1, OU
        protected TypeSerializer<IN2> serializer2;
 
        @Override
-       public void setup(StreamTaskContext<OUT> taskContext, ExecutionConfig 
executionConfig) {
+       public void setup(StreamTaskContext<OUT> taskContext) {
                this.collector = taskContext.getOutputCollector();
 
                this.recordIterator = taskContext.getCoReader();

http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
index 7b6e75e..de4660a 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
@@ -69,7 +69,7 @@ public class CoStreamVertex<IN1, IN2, OUT> extends 
StreamVertex<IN1, OUT> {
        @Override
        protected void setInvokable() {
                userInvokable = configuration.getUserInvokable(userClassLoader);
-               userInvokable.setup(this, getExecutionConfig());
+               userInvokable.setup(this);
        }
 
        protected void setConfigInputs() throws StreamVertexException {

http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java
index 665decd..1c904ca 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.streamvertex;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.streaming.api.StreamConfig;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.io.CoReaderIterator;
@@ -32,7 +33,7 @@ public interface StreamTaskContext<OUT> {
        ClassLoader getUserCodeClassLoader();
 
        <X> MutableObjectIterator<X> getInput(int index);
-       
+
        <X> IndexedReaderIterator<X> getIndexedInput(int index);
 
        <X> StreamRecordSerializer<X> getInputSerializer(int index);
@@ -40,4 +41,6 @@ public interface StreamTaskContext<OUT> {
        Collector<OUT> getOutputCollector();
 
        <X, Y> CoReaderIterator<X, Y> getCoReader();
+
+       ExecutionConfig getExecutionConfig();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index 91ffec1..5033357 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -98,7 +98,7 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable 
implements StreamTa
 
        protected void setInvokable() {
                userInvokable = configuration.getUserInvokable(userClassLoader);
-               userInvokable.setup(this, getExecutionConfig());
+               userInvokable.setup(this);
        }
 
        public String getName() {

http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
index 59abcd6..1a45194 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
@@ -22,7 +22,7 @@ import java.io.Serializable;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.util.Collector;
 
-public interface WindowBuffer<T> extends Serializable {
+public interface WindowBuffer<T> extends Serializable, Cloneable {
 
        public void store(T element) throws Exception;
 
@@ -31,7 +31,7 @@ public interface WindowBuffer<T> extends Serializable {
        public boolean emitWindow(Collector<StreamWindow<T>> collector);
 
        public int size();
-       
+
        public WindowBuffer<T> clone();
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
index a0d08f1..98a6a8d 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
@@ -156,7 +156,7 @@ public class MockCoContext<IN1, IN2, OUT> implements 
StreamTaskContext<OUT> {
        public static <IN1, IN2, OUT> List<OUT> 
createAndExecute(CoInvokable<IN1, IN2, OUT> invokable,
                        List<IN1> input1, List<IN2> input2) {
                MockCoContext<IN1, IN2, OUT> mockContext = new 
MockCoContext<IN1, IN2, OUT>(input1, input2);
-               invokable.setup(mockContext, new ExecutionConfig());
+               invokable.setup(mockContext);
 
                try {
                        invokable.open(null);
@@ -222,4 +222,9 @@ public class MockCoContext<IN1, IN2, OUT> implements 
StreamTaskContext<OUT> {
                                "Indexed iterator is currently unsupported for 
connected streams.");
        }
 
+       @Override
+       public ExecutionConfig getExecutionConfig() {
+               return new ExecutionConfig();
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
index f3d977f..03038b3 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -112,7 +112,7 @@ public class MockContext<IN, OUT> implements 
StreamTaskContext<OUT> {
        public static <IN, OUT> List<OUT> createAndExecute(StreamInvokable<IN, 
OUT> invokable,
                        List<IN> inputs) {
                MockContext<IN, OUT> mockContext = new MockContext<IN, 
OUT>(inputs);
-               invokable.setup(mockContext, new ExecutionConfig());
+               invokable.setup(mockContext);
                try {
                        invokable.open(null);
                        invokable.invoke();
@@ -170,4 +170,9 @@ public class MockContext<IN, OUT> implements 
StreamTaskContext<OUT> {
                return (IndexedReaderIterator<X>) iterator;
        }
 
+       @Override
+       public ExecutionConfig getExecutionConfig() {
+               return new ExecutionConfig();
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties
index 2fb9345..0b686e5 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties
@@ -16,4 +16,12 @@
 # limitations under the License.
 
################################################################################
 
-log4j.rootLogger=OFF
\ No newline at end of file
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=OFF, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j.properties
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j.properties
new file mode 100644
index 0000000..ed2bbcb
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# This file ensures that tests executed from the IDE show log output
+
+log4j.rootLogger=OFF, console
+
+# Log all infos in the given file
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target = System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n
\ No newline at end of file

Reply via email to