[FLINK-1539] [streaming] Remove calls to uninitalized runtimecontexts
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/44702075 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/44702075 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/44702075 Branch: refs/heads/master Commit: 4470207501ff318bd4d59bad58a5f3fc4ccbfa6f Parents: bb5dc7e Author: Gyula Fora <gyf...@apache.org> Authored: Fri Feb 13 19:57:55 2015 +0100 Committer: mbalassi <mbala...@apache.org> Committed: Mon Feb 16 13:06:08 2015 +0100 ---------------------------------------------------------------------- .../streaming/api/datastream/DataStream.java | 11 ++++---- .../api/datastream/WindowedDataStream.java | 12 ++++----- .../aggregation/ComparableAggregator.java | 14 +++++----- .../api/function/aggregation/SumAggregator.java | 10 +++++--- .../api/function/source/FileSourceFunction.java | 15 +++-------- .../api/invokable/StreamInvokable.java | 5 ++-- .../invokable/operator/ProjectInvokable.java | 4 ++- .../api/invokable/operator/co/CoInvokable.java | 3 +-- .../api/streamvertex/CoStreamVertex.java | 2 +- .../api/streamvertex/StreamTaskContext.java | 5 +++- .../api/streamvertex/StreamVertex.java | 2 +- .../windowing/windowbuffer/WindowBuffer.java | 4 +-- .../flink/streaming/util/MockCoContext.java | 7 ++++- .../flink/streaming/util/MockContext.java | 7 ++++- .../src/test/resources/log4j-test.properties | 10 +++++++- .../src/test/resources/log4j.properties | 27 ++++++++++++++++++++ 16 files changed, 92 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index e766626..71a97f8 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -636,7 +636,8 @@ public class DataStream<OUT> { * @return The transformed DataStream. */ public SingleOutputStreamOperator<OUT, ?> sum(String field) { - return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field, getType())); + return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field, getType(), + getExecutionConfig())); } /** @@ -667,7 +668,7 @@ public class DataStream<OUT> { */ public SingleOutputStreamOperator<OUT, ?> min(String field) { return aggregate(ComparableAggregator.getAggregator(field, getType(), AggregationType.MIN, - false)); + false, getExecutionConfig())); } /** @@ -698,7 +699,7 @@ public class DataStream<OUT> { */ public SingleOutputStreamOperator<OUT, ?> max(String field) { return aggregate(ComparableAggregator.getAggregator(field, getType(), AggregationType.MAX, - false)); + false, getExecutionConfig())); } /** @@ -718,7 +719,7 @@ public class DataStream<OUT> { */ public SingleOutputStreamOperator<OUT, ?> minBy(String field, boolean first) { return aggregate(ComparableAggregator.getAggregator(field, getType(), - AggregationType.MINBY, first)); + AggregationType.MINBY, first, getExecutionConfig())); } /** @@ -738,7 +739,7 @@ public class DataStream<OUT> { */ public SingleOutputStreamOperator<OUT, ?> maxBy(String field, boolean first) { return aggregate(ComparableAggregator.getAggregator(field, getType(), - AggregationType.MAXBY, first)); + AggregationType.MAXBY, first, getExecutionConfig())); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java index 3ff5859..4da12ac 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java @@ -381,7 +381,8 @@ public class WindowedDataStream<OUT> { * @return The transformed DataStream. */ public WindowedDataStream<OUT> sum(String field) { - return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field, getType())); + return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field, getType(), + getExecutionConfig())); } /** @@ -411,7 +412,7 @@ public class WindowedDataStream<OUT> { */ public WindowedDataStream<OUT> min(String field) { return aggregate(ComparableAggregator.getAggregator(field, getType(), AggregationType.MIN, - false)); + false, getExecutionConfig())); } /** @@ -475,7 +476,7 @@ public class WindowedDataStream<OUT> { */ public WindowedDataStream<OUT> minBy(String field, boolean first) { return aggregate(ComparableAggregator.getAggregator(field, getType(), - AggregationType.MINBY, first)); + AggregationType.MINBY, first, getExecutionConfig())); } /** @@ -505,7 +506,7 @@ public class WindowedDataStream<OUT> { */ public WindowedDataStream<OUT> max(String field) { return aggregate(ComparableAggregator.getAggregator(field, getType(), AggregationType.MAX, - false)); + false, getExecutionConfig())); } /** @@ -569,11 +570,10 @@ public class WindowedDataStream<OUT> { */ public WindowedDataStream<OUT> maxBy(String field, boolean first) { return aggregate(ComparableAggregator.getAggregator(field, getType(), - AggregationType.MAXBY, first)); + AggregationType.MAXBY, first, getExecutionConfig())); } private WindowedDataStream<OUT> aggregate(AggregationFunction<OUT> aggregator) { - return reduceWindow(aggregator); } http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java index 7f7cf0b..66da931 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregator.java @@ -21,6 +21,7 @@ import java.lang.reflect.Array; import java.lang.reflect.Field; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -65,9 +66,10 @@ public abstract class ComparableAggregator<T> extends AggregationFunction<T> { } public static <R> AggregationFunction<R> getAggregator(String field, - TypeInformation<R> typeInfo, AggregationType aggregationType, boolean first) { + TypeInformation<R> typeInfo, AggregationType aggregationType, boolean first, + ExecutionConfig config) { - return new PojoComparableAggregator<R>(field, typeInfo, aggregationType, first); + return new PojoComparableAggregator<R>(field, typeInfo, aggregationType, first, config); } private static class TupleComparableAggregator<T> extends ComparableAggregator<T> { @@ -177,7 +179,7 @@ public abstract class ComparableAggregator<T> extends AggregationFunction<T> { PojoComparator<T> pojoComparator; public PojoComparableAggregator(String field, TypeInformation<?> typeInfo, - AggregationType aggregationType, boolean first) { + AggregationType aggregationType, boolean first, ExecutionConfig config) { super(0, aggregationType, first); if (!(typeInfo instanceof CompositeType<?>)) { throw new IllegalArgumentException( @@ -193,7 +195,7 @@ public abstract class ComparableAggregator<T> extends AggregationFunction<T> { if (cType instanceof PojoTypeInfo) { pojoComparator = (PojoComparator<T>) cType.createComparator( - new int[] { logicalKeyPosition }, new boolean[] { false }, 0, getRuntimeContext().getExecutionConfig()); + new int[] { logicalKeyPosition }, new boolean[] { false }, 0, config); } else { throw new IllegalArgumentException( "Key expressions are only supported on POJO types. " @@ -225,8 +227,8 @@ public abstract class ComparableAggregator<T> extends AggregationFunction<T> { } else { if (c == 1) { keyFields[0].set(value2, field1); - } - + } + return value2; } } http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java index 74e4597..20d4450 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregator.java @@ -21,6 +21,7 @@ import java.lang.reflect.Array; import java.lang.reflect.Field; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; @@ -47,9 +48,10 @@ public abstract class SumAggregator { } - public static <T> ReduceFunction<T> getSumFunction(String field, TypeInformation<T> typeInfo) { + public static <T> ReduceFunction<T> getSumFunction(String field, TypeInformation<T> typeInfo, + ExecutionConfig config) { - return new PojoSumAggregator<T>(field, typeInfo); + return new PojoSumAggregator<T>(field, typeInfo, config); } private static class TupleSumAggregator<T> extends AggregationFunction<T> { @@ -126,7 +128,7 @@ public abstract class SumAggregator { SumFunction adder; PojoComparator<T> comparator; - public PojoSumAggregator(String field, TypeInformation<?> type) { + public PojoSumAggregator(String field, TypeInformation<?> type, ExecutionConfig config) { super(0); if (!(type instanceof CompositeType<?>)) { throw new IllegalArgumentException( @@ -146,7 +148,7 @@ public abstract class SumAggregator { if (cType instanceof PojoTypeInfo) { comparator = (PojoComparator<T>) cType.createComparator( - new int[] { logicalKeyPosition }, new boolean[] { false }, 0, getRuntimeContext().getExecutionConfig()); + new int[] { logicalKeyPosition }, new boolean[] { false }, 0, config); } else { throw new IllegalArgumentException( "Key expressions are only supported on POJO types. " http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java index 6d1441a..20f5f56 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java @@ -23,8 +23,6 @@ import java.util.NoSuchElementException; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerFactory; -import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplit; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; @@ -38,17 +36,11 @@ public class FileSourceFunction extends RichSourceFunction<String> { private InputFormat<String, ?> inputFormat; - private TypeSerializerFactory<String> serializerFactory; + private TypeInformation<String> typeInfo; public FileSourceFunction(InputFormat<String, ?> format, TypeInformation<String> typeInfo) { this.inputFormat = format; - this.serializerFactory = createSerializer(typeInfo); - } - - private TypeSerializerFactory<String> createSerializer(TypeInformation<String> typeInfo) { - TypeSerializer<String> serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig()); - - return new RuntimeSerializerFactory<String>(serializer, typeInfo.getTypeClass()); + this.typeInfo = typeInfo; } @Override @@ -60,7 +52,8 @@ public class FileSourceFunction extends RichSourceFunction<String> { @Override public void invoke(Collector<String> collector) throws Exception { - final TypeSerializer<String> serializer = serializerFactory.getSerializer(); + final TypeSerializer<String> serializer = typeInfo.createSerializer(getRuntimeContext() + .getExecutionConfig()); final Iterator<InputSplit> splitIterator = getInputSplits(); @SuppressWarnings("unchecked") final InputFormat<String, InputSplit> format = (InputFormat<String, InputSplit>) this.inputFormat; http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java index 7feeac8..733edc7 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java @@ -73,9 +73,8 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable { * * @param taskContext * StreamTaskContext representing the vertex - * @param executionConfig */ - public void setup(StreamTaskContext<OUT> taskContext, ExecutionConfig executionConfig) { + public void setup(StreamTaskContext<OUT> taskContext) { this.collector = taskContext.getOutputCollector(); this.recordIterator = taskContext.getIndexedInput(0); this.inSerializer = taskContext.getInputSerializer(0); @@ -84,7 +83,7 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable { this.objectSerializer = inSerializer.getObjectSerializer(); } this.taskContext = taskContext; - this.executionConfig = executionConfig; + this.executionConfig = taskContext.getExecutionConfig(); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java index 69c7cee..31689c7 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java @@ -28,6 +28,7 @@ public class ProjectInvokable<IN, OUT extends Tuple> extends StreamInvokable<IN, transient OUT outTuple; TypeSerializer<OUT> outTypeSerializer; + TypeInformation<OUT> outTypeInformation; int[] fields; int numFields; @@ -35,7 +36,7 @@ public class ProjectInvokable<IN, OUT extends Tuple> extends StreamInvokable<IN, super(null); this.fields = fields; this.numFields = this.fields.length; - this.outTypeSerializer = outTypeInformation.createSerializer(executionConfig); + this.outTypeInformation = outTypeInformation; } @Override @@ -56,6 +57,7 @@ public class ProjectInvokable<IN, OUT extends Tuple> extends StreamInvokable<IN, @Override public void open(Configuration config) throws Exception { super.open(config); + this.outTypeSerializer = outTypeInformation.createSerializer(executionConfig); outTuple = outTypeSerializer.createInstance(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java index 9f98db3..604873e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java @@ -17,7 +17,6 @@ package org.apache.flink.streaming.api.invokable.operator.co; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.streaming.api.invokable.StreamInvokable; @@ -47,7 +46,7 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<IN1, OU protected TypeSerializer<IN2> serializer2; @Override - public void setup(StreamTaskContext<OUT> taskContext, ExecutionConfig executionConfig) { + public void setup(StreamTaskContext<OUT> taskContext) { this.collector = taskContext.getOutputCollector(); this.recordIterator = taskContext.getCoReader(); http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java index 7b6e75e..de4660a 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java @@ -69,7 +69,7 @@ public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> { @Override protected void setInvokable() { userInvokable = configuration.getUserInvokable(userClassLoader); - userInvokable.setup(this, getExecutionConfig()); + userInvokable.setup(this); } protected void setConfigInputs() throws StreamVertexException { http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java index 665decd..1c904ca 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.streamvertex; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.streaming.api.StreamConfig; import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; import org.apache.flink.streaming.io.CoReaderIterator; @@ -32,7 +33,7 @@ public interface StreamTaskContext<OUT> { ClassLoader getUserCodeClassLoader(); <X> MutableObjectIterator<X> getInput(int index); - + <X> IndexedReaderIterator<X> getIndexedInput(int index); <X> StreamRecordSerializer<X> getInputSerializer(int index); @@ -40,4 +41,6 @@ public interface StreamTaskContext<OUT> { Collector<OUT> getOutputCollector(); <X, Y> CoReaderIterator<X, Y> getCoReader(); + + ExecutionConfig getExecutionConfig(); } http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java index 91ffec1..5033357 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java @@ -98,7 +98,7 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa protected void setInvokable() { userInvokable = configuration.getUserInvokable(userClassLoader); - userInvokable.setup(this, getExecutionConfig()); + userInvokable.setup(this); } public String getName() { http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java index 59abcd6..1a45194 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java @@ -22,7 +22,7 @@ import java.io.Serializable; import org.apache.flink.streaming.api.windowing.StreamWindow; import org.apache.flink.util.Collector; -public interface WindowBuffer<T> extends Serializable { +public interface WindowBuffer<T> extends Serializable, Cloneable { public void store(T element) throws Exception; @@ -31,7 +31,7 @@ public interface WindowBuffer<T> extends Serializable { public boolean emitWindow(Collector<StreamWindow<T>> collector); public int size(); - + public WindowBuffer<T> clone(); } http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java index a0d08f1..98a6a8d 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java @@ -156,7 +156,7 @@ public class MockCoContext<IN1, IN2, OUT> implements StreamTaskContext<OUT> { public static <IN1, IN2, OUT> List<OUT> createAndExecute(CoInvokable<IN1, IN2, OUT> invokable, List<IN1> input1, List<IN2> input2) { MockCoContext<IN1, IN2, OUT> mockContext = new MockCoContext<IN1, IN2, OUT>(input1, input2); - invokable.setup(mockContext, new ExecutionConfig()); + invokable.setup(mockContext); try { invokable.open(null); @@ -222,4 +222,9 @@ public class MockCoContext<IN1, IN2, OUT> implements StreamTaskContext<OUT> { "Indexed iterator is currently unsupported for connected streams."); } + @Override + public ExecutionConfig getExecutionConfig() { + return new ExecutionConfig(); + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java index f3d977f..03038b3 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java @@ -112,7 +112,7 @@ public class MockContext<IN, OUT> implements StreamTaskContext<OUT> { public static <IN, OUT> List<OUT> createAndExecute(StreamInvokable<IN, OUT> invokable, List<IN> inputs) { MockContext<IN, OUT> mockContext = new MockContext<IN, OUT>(inputs); - invokable.setup(mockContext, new ExecutionConfig()); + invokable.setup(mockContext); try { invokable.open(null); invokable.invoke(); @@ -170,4 +170,9 @@ public class MockContext<IN, OUT> implements StreamTaskContext<OUT> { return (IndexedReaderIterator<X>) iterator; } + @Override + public ExecutionConfig getExecutionConfig() { + return new ExecutionConfig(); + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties b/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties index 2fb9345..0b686e5 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties @@ -16,4 +16,12 @@ # limitations under the License. ################################################################################ -log4j.rootLogger=OFF \ No newline at end of file +# Set root logger level to DEBUG and its only appender to A1. +log4j.rootLogger=OFF, A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender + +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/44702075/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j.properties b/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j.properties new file mode 100644 index 0000000..ed2bbcb --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j.properties @@ -0,0 +1,27 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# This file ensures that tests executed from the IDE show log output + +log4j.rootLogger=OFF, console + +# Log all infos in the given file +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target = System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n \ No newline at end of file