Repository: flink Updated Branches: refs/heads/master 47b5cb795 -> f9eea5e5a
[FLINK-2674] Add Fold Window Operation for new Windowing API Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f9eea5e5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f9eea5e5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f9eea5e5 Branch: refs/heads/master Commit: f9eea5e5a7d13d844e06d3eb6849268b3eaf4c08 Parents: ce792b1 Author: Aljoscha Krettek <[email protected]> Authored: Wed Oct 7 16:49:40 2015 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Wed Oct 7 22:08:25 2015 +0200 ---------------------------------------------------------------------- .../api/datastream/AllWindowedStream.java | 34 ++++ .../api/datastream/WindowedStream.java | 34 ++++ .../windowing/FoldAllWindowFunction.java | 97 ++++++++++ .../functions/windowing/FoldWindowFunction.java | 97 ++++++++++ .../windowing/NonKeyedWindowOperator.java | 12 +- .../operators/windowing/WindowOperator.java | 12 +- .../operators/windowing/WindowFoldITCase.java | 191 +++++++++++++++++++ .../streaming/api/scala/AllWindowedStream.scala | 43 ++++- .../streaming/api/scala/WindowedStream.scala | 43 ++++- .../streaming/api/scala/WindowFoldITCase.scala | 148 ++++++++++++++ 10 files changed, 707 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f9eea5e5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java index a8d7654..c7a70d7 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.datastream; import org.apache.commons.lang.SerializationUtils; +import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -29,6 +30,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction; import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator; import org.apache.flink.streaming.api.functions.aggregation.SumAggregator; +import org.apache.flink.streaming.api.functions.windowing.FoldAllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ReduceAllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -164,6 +166,38 @@ public class AllWindowedStream<T, W extends Window> { } /** + * Applies the given fold function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the reduce function is + * interpreted as a regular non-windowed stream. + * + * @param function The fold function. + * @return The data stream that is the result of applying the fold function to the window. + */ + public <R> DataStream<R> fold(R initialValue, FoldFunction<T, R> function) { + //clean the closure + function = input.getExecutionEnvironment().clean(function); + + TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(function, input.getType(), + Utils.getCallLocationName(), true); + + return apply(new FoldAllWindowFunction<W, T, R>(initialValue, function), resultType); + } + + /** + * Applies the given fold function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the reduce function is + * interpreted as a regular non-windowed stream. + * + * @param function The fold function. + * @return The data stream that is the result of applying the fold function to the window. + */ + public <R> DataStream<R> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) { + //clean the closure + function = input.getExecutionEnvironment().clean(function); + return apply(new FoldAllWindowFunction<W, T, R>(initialValue, function), resultType); + } + + /** * Applies a window function to the window. The window function is called for each evaluation * of the window for each key individually. The output of the window function is interpreted * as a regular non-windowed stream. http://git-wip-us.apache.org/repos/asf/flink/blob/f9eea5e5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index 99f7d06..42e0bd7 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.datastream; import org.apache.commons.lang.SerializationUtils; +import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -30,6 +31,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction; import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator; import org.apache.flink.streaming.api.functions.aggregation.SumAggregator; +import org.apache.flink.streaming.api.functions.windowing.FoldWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -178,6 +180,38 @@ public class WindowedStream<T, K, W extends Window> { } /** + * Applies the given fold function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the reduce function is + * interpreted as a regular non-windowed stream. + * + * @param function The fold function. + * @return The data stream that is the result of applying the fold function to the window. + */ + public <R> DataStream<R> fold(R initialValue, FoldFunction<T, R> function) { + //clean the closure + function = input.getExecutionEnvironment().clean(function); + + TypeInformation<R> resultType = TypeExtractor.getFoldReturnTypes(function, input.getType(), + Utils.getCallLocationName(), true); + + return apply(new FoldWindowFunction<K, W, T, R>(initialValue, function), resultType); + } + + /** + * Applies the given fold function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the reduce function is + * interpreted as a regular non-windowed stream. + * + * @param function The fold function. + * @return The data stream that is the result of applying the fold function to the window. + */ + public <R> DataStream<R> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) { + //clean the closure + function = input.getExecutionEnvironment().clean(function); + return apply(new FoldWindowFunction<K, W, T, R>(initialValue, function), resultType); + } + + /** * Applies the given window function to each window. The window function is called for each * evaluation of the window for each key individually. The output of the window function is * interpreted as a regular non-windowed stream. http://git-wip-us.apache.org/repos/asf/flink/blob/f9eea5e5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldAllWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldAllWindowFunction.java new file mode 100644 index 0000000..69f24fe --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldAllWindowFunction.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.windowing; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.InputViewDataInputStreamWrapper; +import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper; +import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.Collector; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +public class FoldAllWindowFunction<W extends Window, T, R> + extends WrappingFunction<FoldFunction<T, R>> + implements AllWindowFunction<T, R, W>, OutputTypeConfigurable<R> { + private static final long serialVersionUID = 1L; + + private byte[] serializedInitialValue; + private TypeSerializer<R> outSerializer; + private transient R initialValue; + + public FoldAllWindowFunction(R initialValue, FoldFunction<T, R> reduceFunction) { + super(reduceFunction); + this.initialValue = initialValue; + } + + @Override + public void open(Configuration configuration) throws Exception { + super.open(configuration); + + if (serializedInitialValue == null) { + throw new RuntimeException("No initial value was serialized for the fold " + + "window function. Probably the setOutputType method was not called."); + } + + ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue); + InputViewDataInputStreamWrapper in = new InputViewDataInputStreamWrapper( + new DataInputStream(bais) + ); + initialValue = outSerializer.deserialize(in); + } + + @Override + public void apply(W window, Iterable<T> values, Collector<R> out) throws Exception { + R result = outSerializer.copy(initialValue); + + for (T val: values) { + result = wrappedFunction.fold(result, val); + } + + out.collect(result); + } + + @Override + public void setOutputType(TypeInformation<R> outTypeInfo, ExecutionConfig executionConfig) { + outSerializer = outTypeInfo.createSerializer(executionConfig); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + OutputViewDataOutputStreamWrapper out = new OutputViewDataOutputStreamWrapper( + new DataOutputStream(baos) + ); + + try { + outSerializer.serialize(initialValue, out); + } catch (IOException ioe) { + throw new RuntimeException("Unable to serialize initial value of type " + + initialValue.getClass().getSimpleName() + " of fold window function.", ioe); + } + + serializedInitialValue = baos.toByteArray(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f9eea5e5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java new file mode 100644 index 0000000..04d2ac7 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldWindowFunction.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.windowing; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.InputViewDataInputStreamWrapper; +import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper; +import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.Collector; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +public class FoldWindowFunction<K, W extends Window, T, R> + extends WrappingFunction<FoldFunction<T, R>> + implements WindowFunction<T, R, K, W>, OutputTypeConfigurable<R> { + private static final long serialVersionUID = 1L; + + private byte[] serializedInitialValue; + private TypeSerializer<R> outSerializer; + private transient R initialValue; + + public FoldWindowFunction(R initialValue, FoldFunction<T, R> reduceFunction) { + super(reduceFunction); + this.initialValue = initialValue; + } + + @Override + public void open(Configuration configuration) throws Exception { + super.open(configuration); + + if (serializedInitialValue == null) { + throw new RuntimeException("No initial value was serialized for the fold " + + "window function. Probably the setOutputType method was not called."); + } + + ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue); + InputViewDataInputStreamWrapper in = new InputViewDataInputStreamWrapper( + new DataInputStream(bais) + ); + initialValue = outSerializer.deserialize(in); + } + + @Override + public void apply(K k, W window, Iterable<T> values, Collector<R> out) throws Exception { + R result = outSerializer.copy(initialValue); + + for (T val: values) { + result = wrappedFunction.fold(result, val); + } + + out.collect(result); + } + + @Override + public void setOutputType(TypeInformation<R> outTypeInfo, ExecutionConfig executionConfig) { + outSerializer = outTypeInfo.createSerializer(executionConfig); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + OutputViewDataOutputStreamWrapper out = new OutputViewDataOutputStreamWrapper( + new DataOutputStream(baos) + ); + + try { + outSerializer.serialize(initialValue, out); + } catch (IOException ioe) { + throw new RuntimeException("Unable to serialize initial value of type " + + initialValue.getClass().getSimpleName() + " of fold window function.", ioe); + } + + serializedInitialValue = baos.toByteArray(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f9eea5e5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java index a80242d..e6aa53b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java @@ -29,6 +29,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; @@ -56,7 +57,7 @@ import java.util.Set; */ public class NonKeyedWindowOperator<IN, OUT, W extends Window> extends AbstractUdfStreamOperator<OUT, AllWindowFunction<IN, OUT, W>> - implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable { + implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable, OutputTypeConfigurable<OUT> { private static final long serialVersionUID = 1L; @@ -268,6 +269,15 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window> return this; } + @Override + public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) { + if (userFunction instanceof OutputTypeConfigurable) { + @SuppressWarnings("unchecked") + OutputTypeConfigurable<OUT> typeConfigurable = (OutputTypeConfigurable<OUT>) userFunction; + typeConfigurable.setOutputType(outTypeInfo, executionConfig); + } + } + // ------------------------------------------------------------------------ // Getters for testing // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/f9eea5e5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index 368b8fa..7762101 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -30,6 +30,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; @@ -74,7 +75,7 @@ import java.util.Set; */ public class WindowOperator<K, IN, OUT, W extends Window> extends AbstractUdfStreamOperator<OUT, WindowFunction<IN, OUT, K, W>> - implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable { + implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable, OutputTypeConfigurable<OUT> { private static final long serialVersionUID = 1L; @@ -342,6 +343,15 @@ public class WindowOperator<K, IN, OUT, W extends Window> return this; } + @Override + public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) { + if (userFunction instanceof OutputTypeConfigurable) { + @SuppressWarnings("unchecked") + OutputTypeConfigurable<OUT> typeConfigurable = (OutputTypeConfigurable<OUT>) userFunction; + typeConfigurable.setOutputType(outTypeInfo, executionConfig); + } + } + // ------------------------------------------------------------------------ // Getters for testing // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/f9eea5e5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java new file mode 100644 index 0000000..45649bd --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java @@ -0,0 +1,191 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.flink.streaming.runtime.operators.windowing; + +import com.google.common.collect.Lists; +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.TimestampExtractor; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Tests for Folds over windows. These also test whether OutputTypeConfigurable functions + * work for windows, because FoldWindowFunction is OutputTypeConfigurable. + */ +public class WindowFoldITCase extends StreamingMultipleProgramsTestBase { + + private static List<String> testResults; + + @Test + public void testFoldWindow() throws Exception { + + testResults = Lists.newArrayList(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + env.setParallelism(1); + + DataStream<Tuple2<String, Integer>> source1 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() { + private static final long serialVersionUID = 1L; + + @Override + public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception { + ctx.collect(Tuple2.of("a", 0)); + ctx.collect(Tuple2.of("a", 1)); + ctx.collect(Tuple2.of("a", 2)); + + ctx.collect(Tuple2.of("b", 3)); + ctx.collect(Tuple2.of("b", 4)); + ctx.collect(Tuple2.of("b", 5)); + + ctx.collect(Tuple2.of("a", 6)); + ctx.collect(Tuple2.of("a", 7)); + ctx.collect(Tuple2.of("a", 8)); + } + + @Override + public void cancel() { + } + }).extractTimestamp(new Tuple2TimestampExtractor()); + + source1 + .keyBy(0) + .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) + .fold(Tuple2.of("R:", 0), new FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { + @Override + public Tuple2<String, Integer> fold(Tuple2<String, Integer> accumulator, + Tuple2<String, Integer> value) throws Exception { + accumulator.f0 += value.f0; + accumulator.f1 += value.f1; + return accumulator; + } + }) + .addSink(new SinkFunction<Tuple2<String, Integer>>() { + @Override + public void invoke(Tuple2<String, Integer> value) throws Exception { + testResults.add(value.toString()); + } + }); + + env.execute("Fold Window Test"); + + List<String> expectedResult = Lists.newArrayList( + "(R:aaa,3)", + "(R:aaa,21)", + "(R:bbb,12)"); + + Collections.sort(expectedResult); + Collections.sort(testResults); + + Assert.assertEquals(expectedResult, testResults); + } + + @Test + public void testFoldAllWindow() throws Exception { + + testResults = Lists.newArrayList(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + env.setParallelism(1); + + DataStream<Tuple2<String, Integer>> source1 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() { + private static final long serialVersionUID = 1L; + + @Override + public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception { + ctx.collect(Tuple2.of("a", 0)); + ctx.collect(Tuple2.of("a", 1)); + ctx.collect(Tuple2.of("a", 2)); + + ctx.collect(Tuple2.of("b", 3)); + ctx.collect(Tuple2.of("a", 3)); + ctx.collect(Tuple2.of("b", 4)); + ctx.collect(Tuple2.of("a", 4)); + ctx.collect(Tuple2.of("b", 5)); + ctx.collect(Tuple2.of("a", 5)); + + } + + @Override + public void cancel() { + } + }).extractTimestamp(new Tuple2TimestampExtractor()); + + source1 + .windowAll(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) + .fold(Tuple2.of("R:", 0), new FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { + @Override + public Tuple2<String, Integer> fold(Tuple2<String, Integer> accumulator, + Tuple2<String, Integer> value) throws Exception { + accumulator.f0 += value.f0; + accumulator.f1 += value.f1; + return accumulator; + } + }) + .addSink(new SinkFunction<Tuple2<String, Integer>>() { + @Override + public void invoke(Tuple2<String, Integer> value) throws Exception { + testResults.add(value.toString()); + } + }); + + env.execute("Fold All-Window Test"); + + List<String> expectedResult = Lists.newArrayList( + "(R:aaa,3)", + "(R:bababa,24)"); + + Collections.sort(expectedResult); + Collections.sort(testResults); + + Assert.assertEquals(expectedResult, testResults); + } + + private static class Tuple2TimestampExtractor implements TimestampExtractor<Tuple2<String, Integer>> { + private static final long serialVersionUID = 1L; + + @Override + public long extractTimestamp(Tuple2<String, Integer> element, long currentTimestamp) { + return element.f1; + } + + @Override + public long emitWatermark(Tuple2<String, Integer> element, long currentTimestamp) { + return element.f1 - 1; + } + + @Override + public long getCurrentWatermark() { + return Long.MIN_VALUE; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f9eea5e5/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala index d2d0a1d..65cafb7 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala @@ -18,7 +18,7 @@ package org.apache.flink.streaming.api.scala -import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.api.common.functions.{FoldFunction, ReduceFunction} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWStream} import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType @@ -122,6 +122,47 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { } /** + * Applies the given fold function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the reduce function is + * interpreted as a regular non-windowed stream. + * + * @param function The fold function. + * @return The data stream that is the result of applying the fold function to the window. + */ + def fold[R: TypeInformation: ClassTag]( + initialValue: R, + function: FoldFunction[T,R]): DataStream[R] = { + if (function == null) { + throw new NullPointerException("Fold function must not be null.") + } + + val resultType : TypeInformation[R] = implicitly[TypeInformation[R]] + + javaStream.fold(initialValue, function, resultType) + } + + /** + * Applies the given fold function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the reduce function is + * interpreted as a regular non-windowed stream. + * + * @param function The fold function. + * @return The data stream that is the result of applying the fold function to the window. + */ + def fold[R: TypeInformation: ClassTag](initialValue: R, function: (R, T) => R): DataStream[R] = { + if (function == null) { + throw new NullPointerException("Fold function must not be null.") + } + val cleanFun = clean(function) + val folder = new FoldFunction[T,R] { + def fold(acc: R, v: T) = { + cleanFun(acc, v) + } + } + fold(initialValue, folder) + } + + /** * Applies the given window function to each window. The window function is called for each * evaluation of the window for each key individually. The output of the window function is * interpreted as a regular non-windowed stream. http://git-wip-us.apache.org/repos/asf/flink/blob/f9eea5e5/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala index 3963765..a8ddaf8 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala @@ -18,7 +18,7 @@ package org.apache.flink.streaming.api.scala -import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.api.common.functions.{FoldFunction, ReduceFunction} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.datastream.{WindowedStream => JavaWStream} import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType @@ -125,6 +125,47 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { } /** + * Applies the given fold function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the reduce function is + * interpreted as a regular non-windowed stream. + * + * @param function The fold function. + * @return The data stream that is the result of applying the fold function to the window. + */ + def fold[R: TypeInformation: ClassTag]( + initialValue: R, + function: FoldFunction[T,R]): DataStream[R] = { + if (function == null) { + throw new NullPointerException("Fold function must not be null.") + } + + val resultType : TypeInformation[R] = implicitly[TypeInformation[R]] + + javaStream.fold(initialValue, function, resultType) + } + + /** + * Applies the given fold function to each window. The window function is called for each + * evaluation of the window for each key individually. The output of the reduce function is + * interpreted as a regular non-windowed stream. + * + * @param function The fold function. + * @return The data stream that is the result of applying the fold function to the window. + */ + def fold[R: TypeInformation: ClassTag](initialValue: R, function: (R, T) => R): DataStream[R] = { + if (function == null) { + throw new NullPointerException("Fold function must not be null.") + } + val cleanFun = clean(function) + val folder = new FoldFunction[T,R] { + def fold(acc: R, v: T) = { + cleanFun(acc, v) + } + } + fold(initialValue, folder) + } + + /** * Applies the given window function to each window. The window function is called for each * evaluation of the window for each key individually. The output of the window function is * interpreted as a regular non-windowed stream. http://git-wip-us.apache.org/repos/asf/flink/blob/f9eea5e5/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala new file mode 100644 index 0000000..dd098a0 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + +import java.util.concurrent.TimeUnit + +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.TimestampExtractor +import org.apache.flink.streaming.api.functions.sink.SinkFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.junit.Test +import org.junit.Assert._ + +import scala.collection.mutable + +/** + * Tests for Folds over windows. These also test whether OutputTypeConfigurable functions + * work for windows, because FoldWindowFunction is OutputTypeConfigurable. + */ +class WindowFoldITCase extends StreamingMultipleProgramsTestBase { + + @Test + def testFoldWindow(): Unit = { + WindowFoldITCase.testResults = mutable.MutableList() + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setParallelism(1) + + val source1 = env.addSource(new SourceFunction[(String, Int)]() { + def run(ctx: SourceFunction.SourceContext[(String, Int)]) { + ctx.collect(("a", 0)) + ctx.collect(("a", 1)) + ctx.collect(("a", 2)) + ctx.collect(("b", 3)) + ctx.collect(("b", 4)) + ctx.collect(("b", 5)) + ctx.collect(("a", 6)) + ctx.collect(("a", 7)) + ctx.collect(("a", 8)) + } + + def cancel() { + } + }).extractTimestamp(new WindowFoldITCase.Tuple2TimestampExtractor) + + source1 + .keyBy(0) + .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) + .fold(("R:", 0), { (acc: (String, Int), v: (String, Int)) => (acc._1 + v._1, acc._2 + v._2) }) + .addSink(new SinkFunction[(String, Int)]() { + def invoke(value: (String, Int)) { + WindowFoldITCase.testResults += value.toString + } + }) + + env.execute("Fold Window Test") + + val expectedResult = mutable.MutableList( + "(R:aaa,3)", + "(R:aaa,21)", + "(R:bbb,12)") + + assertEquals(expectedResult.sorted, WindowFoldITCase.testResults.sorted) + } + + @Test + def testFoldAllWindow(): Unit = { + WindowFoldITCase.testResults = mutable.MutableList() + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setParallelism(1) + + val source1 = env.addSource(new SourceFunction[(String, Int)]() { + def run(ctx: SourceFunction.SourceContext[(String, Int)]) { + ctx.collect(("a", 0)) + ctx.collect(("a", 1)) + ctx.collect(("a", 2)) + ctx.collect(("b", 3)) + ctx.collect(("a", 3)) + ctx.collect(("b", 4)) + ctx.collect(("a", 4)) + ctx.collect(("b", 5)) + ctx.collect(("a", 5)) + } + + def cancel() { + } + }).extractTimestamp(new WindowFoldITCase.Tuple2TimestampExtractor) + + source1 + .windowAll(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) + .fold(("R:", 0), { (acc: (String, Int), v: (String, Int)) => (acc._1 + v._1, acc._2 + v._2) }) + .addSink(new SinkFunction[(String, Int)]() { + def invoke(value: (String, Int)) { + WindowFoldITCase.testResults += value.toString + } + }) + + env.execute("Fold All-Window Test") + + val expectedResult = mutable.MutableList( + "(R:aaa,3)", + "(R:bababa,24)") + + assertEquals(expectedResult.sorted, WindowFoldITCase.testResults.sorted) + } + +} + + +object WindowFoldITCase { + private var testResults: mutable.MutableList[String] = null + + private class Tuple2TimestampExtractor extends TimestampExtractor[(String, Int)] { + def extractTimestamp(element: (String, Int), currentTimestamp: Long): Long = { + element._2 + } + + def emitWatermark(element: (String, Int), currentTimestamp: Long): Long = { + element._2 - 1 + } + + def getCurrentWatermark: Long = { + Long.MinValue + } + } +}
