Repository: flink Updated Branches: refs/heads/release-1.0 603f351e2 -> ba069f35b
http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/AllWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/AllWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/AllWindowFunction.scala new file mode 100644 index 0000000..4e77d83 --- /dev/null +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/AllWindowFunction.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.scala.function + +import java.io.Serializable + +import org.apache.flink.annotation.Public +import org.apache.flink.api.common.functions.Function +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.util.Collector + +/** + * Base interface for functions that are evaluated over keyed (grouped) windows. + * + * @tparam IN The type of the input value. + * @tparam OUT The type of the output value. + */ +@Public +trait AllWindowFunction[IN, OUT, W <: Window] extends Function with Serializable { + + /** + * Evaluates the window and outputs none or several elements. + * + * @param window The window that is being evaluated. + * @param input The elements in the window being evaluated. + * @param out A collector for emitting elements. + * @throws Exception The function may throw exceptions to fail the program and trigger recovery. + */ + def apply(window: W, input: Iterable[IN], out: Collector[OUT]) +} http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/WindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/WindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/WindowFunction.scala new file mode 100644 index 0000000..67236b7 --- /dev/null +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/WindowFunction.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.scala.function + +import java.io.Serializable + +import org.apache.flink.annotation.Public +import org.apache.flink.api.common.functions.Function +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.util.Collector + +/** + * Base interface for functions that are evaluated over keyed (grouped) windows. + * + * @tparam IN The type of the input value. + * @tparam OUT The type of the output value. + * @tparam KEY The type of the key. + */ +@Public +trait WindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable { + + /** + * Evaluates the window and outputs none or several elements. + * + * @param key The key for which this window is evaluated. + * @param window The window that is being evaluated. + * @param input The elements in the window being evaluated. + * @param out A collector for emitting elements. + * @throws Exception The function may throw exceptions to fail the program and trigger recovery. + */ + def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT]) +} http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala index dcdfa91..f73307c 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala @@ -25,7 +25,7 @@ import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.common.state.ReducingStateDescriptor import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.TimeCharacteristic -import org.apache.flink.streaming.api.functions.windowing.{WindowFunction, AllWindowFunction} +import org.apache.flink.streaming.api.scala.function.{WindowFunction, AllWindowFunction} import org.apache.flink.streaming.api.transformations.OneInputTransformation import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingTimeWindows, SlidingTimeWindows} import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor} @@ -76,7 +76,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { .windowAll(SlidingTimeWindows.of( Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) - .apply(new AllWindowFunction[Iterable[(String, Int)], (String, Int), TimeWindow]() { + .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() { def apply( window: TimeWindow, values: Iterable[(String, Int)], @@ -122,7 +122,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { val window2 = source .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) - .apply(new AllWindowFunction[Iterable[(String, Int)], (String, Int), TimeWindow]() { + .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() { def apply( window: TimeWindow, values: Iterable[(String, Int)], @@ -173,7 +173,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) .evictor(CountEvictor.of(1000)) - .apply(new AllWindowFunction[Iterable[(String, Int)], (String, Int), TimeWindow]() { + .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() { def apply( window: TimeWindow, values: Iterable[(String, Int)], @@ -211,7 +211,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { def apply( tuple: Tuple, window: TimeWindow, - values: (String, Int), + values: Iterable[(String, Int)], out: Collector[(String, Int)]) { } }) @@ -236,7 +236,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { def apply( tuple: Tuple, window: TimeWindow, - values: (String, Int), + values: Iterable[(String, Int)], out: Collector[(String, Int)]) { } }) http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala index 673d7b3..48ff640 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit import org.apache.flink.api.common.state.{ListStateDescriptor, ReducingStateDescriptor} import org.apache.flink.api.java.tuple.Tuple -import org.apache.flink.streaming.api.functions.windowing.WindowFunction +import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.transformations.OneInputTransformation import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingTimeWindows, SlidingTimeWindows} import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor} @@ -65,7 +65,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { val window2 = source .keyBy(0) .timeWindow(Time.minutes(1)) - .apply(new WindowFunction[Iterable[(String, Int)], (String, Int), Tuple, TimeWindow]() { + .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { def apply( key: Tuple, window: TimeWindow, @@ -114,7 +114,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { .keyBy(0) .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) - .apply(new WindowFunction[Iterable[(String, Int)], (String, Int), Tuple, TimeWindow]() { + .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { def apply( tuple: Tuple, window: TimeWindow, @@ -168,7 +168,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) .evictor(CountEvictor.of(1000)) - .apply(new WindowFunction[Iterable[(String, Int)], (String, Int), Tuple, TimeWindow]() { + .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { def apply( tuple: Tuple, window: TimeWindow, @@ -207,7 +207,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { def apply( tuple: Tuple, window: TimeWindow, - values: (String, Int), + values: Iterable[(String, Int)], out: Collector[(String, Int)]) { } }) @@ -232,7 +232,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { def apply( tuple: Tuple, window: TimeWindow, - values: (String, Int), + values: Iterable[(String, Int)], out: Collector[(String, Int)]) { } }) http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java index 436dd0d..d18a45e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java @@ -105,7 +105,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { NUM_ELEMENTS_PER_KEY / 3)) .rebalance() .timeWindowAll(Time.of(WINDOW_SIZE, MILLISECONDS)) - .apply(new RichAllWindowFunction<Iterable<Tuple2<Long, IntType>>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() { + .apply(new RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() { private boolean open = false; @@ -167,7 +167,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3)) .rebalance() .timeWindowAll(Time.of(WINDOW_SIZE, MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS)) - .apply(new RichAllWindowFunction<Iterable<Tuple2<Long, IntType>>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() { + .apply(new RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() { private boolean open = false; @@ -254,13 +254,18 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { @Override public void apply( TimeWindow window, - Tuple2<Long, IntType> input, + Iterable<Tuple2<Long, IntType>> input, Collector<Tuple4<Long, Long, Long, IntType>> out) { // validate that the function has been opened properly assertTrue(open); - out.collect(new Tuple4<>(input.f0, window.getStart(), window.getEnd(), input.f1)); + for (Tuple2<Long, IntType> in: input) { + out.collect(new Tuple4<>(in.f0, + window.getStart(), + window.getEnd(), + in.f1)); + } } }) .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1); @@ -323,13 +328,18 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { @Override public void apply( TimeWindow window, - Tuple2<Long, IntType> input, + Iterable<Tuple2<Long, IntType>> input, Collector<Tuple4<Long, Long, Long, IntType>> out) { // validate that the function has been opened properly assertTrue(open); - out.collect(new Tuple4<>(input.f0, window.getStart(), window.getEnd(), input.f1)); + for (Tuple2<Long, IntType> in: input) { + out.collect(new Tuple4<>(in.f0, + window.getStart(), + window.getEnd(), + in.f1)); + } } }) .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1); http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java index 6f178d8..ce705e1 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java @@ -152,7 +152,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { .rebalance() .keyBy(0) .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS)) - .apply(new RichWindowFunction<Iterable<Tuple2<Long, IntType>>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() { + .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() { private boolean open = false; @@ -216,7 +216,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { .rebalance() .keyBy(0) .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS)) - .apply(new RichWindowFunction<Iterable<Tuple2<Long, IntType>>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() { + .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() { private boolean open = false; @@ -285,7 +285,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { .rebalance() .keyBy(0) .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS)) - .apply(new RichWindowFunction<Iterable<Tuple2<Long, IntType>>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() { + .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() { private boolean open = false; @@ -373,13 +373,18 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { public void apply( Tuple tuple, TimeWindow window, - Tuple2<Long, IntType> input, + Iterable<Tuple2<Long, IntType>> input, Collector<Tuple4<Long, Long, Long, IntType>> out) { // validate that the function has been opened properly assertTrue(open); - out.collect(new Tuple4<>(input.f0, window.getStart(), window.getEnd(), input.f1)); + for (Tuple2<Long, IntType> in: input) { + out.collect(new Tuple4<>(in.f0, + window.getStart(), + window.getEnd(), + in.f1)); + } } }) .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1); @@ -443,13 +448,18 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { public void apply( Tuple tuple, TimeWindow window, - Tuple2<Long, IntType> input, + Iterable<Tuple2<Long, IntType>> input, Collector<Tuple4<Long, Long, Long, IntType>> out) { // validate that the function has been opened properly assertTrue(open); - out.collect(new Tuple4<>(input.f0, window.getStart(), window.getEnd(), input.f1)); + for (Tuple2<Long, IntType> in: input) { + out.collect(new Tuple4<>(in.f0, + window.getStart(), + window.getEnd(), + in.f1)); + } } }) .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1); http://git-wip-us.apache.org/repos/asf/flink/blob/ba069f35/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java index eb5ef5a..aa5ff3b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java @@ -117,7 +117,7 @@ public class WindowCheckpointingITCase extends TestLogger { .rebalance() .keyBy(0) .timeWindow(Time.of(100, MILLISECONDS)) - .apply(new RichWindowFunction<Iterable<Tuple2<Long, IntType>>, Tuple2<Long, IntType>, Tuple, TimeWindow>() { + .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>, Tuple, TimeWindow>() { private boolean open = false; @@ -175,7 +175,7 @@ public class WindowCheckpointingITCase extends TestLogger { .rebalance() .keyBy(0) .timeWindow(Time.of(150, MILLISECONDS), Time.of(50, MILLISECONDS)) - .apply(new RichWindowFunction<Iterable<Tuple2<Long, IntType>>, Tuple2<Long, IntType>, Tuple, TimeWindow>() { + .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>, Tuple, TimeWindow>() { private boolean open = false;
