http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java index b94e530..9d4a41a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java @@ -18,20 +18,22 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer; -import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer; -import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory; +import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction; import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; @@ -42,57 +44,25 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.Collector; import org.junit.Assert; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import java.util.Arrays; -import java.util.Collection; import java.util.Comparator; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -@RunWith(Parameterized.class) public class WindowOperatorTest { - @SuppressWarnings("unchecked,rawtypes") - private WindowBufferFactory windowBufferFactory; - - public WindowOperatorTest(WindowBufferFactory<?, ?> windowBufferFactory) { - this.windowBufferFactory = windowBufferFactory; - } - // For counting if close() is called the correct number of times on the SumReducer private static AtomicInteger closeCalled = new AtomicInteger(0); - @Test - @SuppressWarnings("unchecked") - public void testSlidingEventTimeWindows() throws Exception { - closeCalled.set(0); - - final int WINDOW_SIZE = 3; - final int WINDOW_SLIDE = 1; - - WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( - SlidingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)), - new TimeWindow.Serializer(), - new TupleKeySelector(), - BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), - windowBufferFactory, - new ReduceWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(new SumReducer()), - EventTimeTrigger.create()); - - operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); - - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = - new OneInputStreamOperatorTestHarness<>(operator); + private void testSlidingEventTimeWindows(OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness) throws Exception { long initialTime = 0L; - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); - testHarness.open(); + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); // add elements out-of-order testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999)); @@ -148,37 +118,84 @@ public class WindowOperatorTest { expectedOutput.add(new Watermark(7999)); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); - - testHarness.close(); - if (windowBufferFactory instanceof PreAggregatingHeapWindowBuffer.Factory) { - Assert.assertEquals("Close was not called.", 2, closeCalled.get()); - } else { - Assert.assertEquals("Close was not called.", 1, closeCalled.get()); - } } @Test @SuppressWarnings("unchecked") - public void testTumblingEventTimeWindows() throws Exception { + public void testSlidingEventTimeWindowsReduce() throws Exception { closeCalled.set(0); final int WINDOW_SIZE = 3; + final int WINDOW_SLIDE = 1; - WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( - TumblingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents", + new SumReducer(), + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( + SlidingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)), new TimeWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), - windowBufferFactory, - new ReduceWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(new SumReducer()), + stateDesc, + new ReduceWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(), EventTimeTrigger.create()); - operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); - + operator.setInputType(inputType, new ExecutionConfig()); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new OneInputStreamOperatorTestHarness<>(operator); + testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.open(); + + testSlidingEventTimeWindows(testHarness); + + testHarness.close(); + } + + @Test + @SuppressWarnings("unchecked") + public void testSlidingEventTimeWindowsApply() throws Exception { + closeCalled.set(0); + + final int WINDOW_SIZE = 3; + final int WINDOW_SLIDE = 1; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents", + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( + SlidingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new RichSumReducer<TimeWindow>(), + EventTimeTrigger.create()); + + operator.setInputType(inputType, new ExecutionConfig()); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>(operator); + + testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.open(); + + testSlidingEventTimeWindows(testHarness); + + testHarness.close(); + + Assert.assertEquals("Close was not called.", 1, closeCalled.get()); + } + + private void testTumblingEventTimeWindows(OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness) throws Exception { long initialTime = 0L; ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); @@ -233,13 +250,79 @@ public class WindowOperatorTest { expectedOutput.add(new Watermark(7999)); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + } + + @Test + @SuppressWarnings("unchecked") + public void testTumblingEventTimeWindowsReduce() throws Exception { + closeCalled.set(0); + + final int WINDOW_SIZE = 3; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents", + new SumReducer(), + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( + TumblingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new ReduceWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(), + EventTimeTrigger.create()); + + operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>(operator); + + testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.open(); + + testTumblingEventTimeWindows(testHarness); testHarness.close(); - if (windowBufferFactory instanceof PreAggregatingHeapWindowBuffer.Factory) { - Assert.assertEquals("Close was not called.", 2, closeCalled.get()); - } else { - Assert.assertEquals("Close was not called.", 1, closeCalled.get()); - } + } + + @Test + @SuppressWarnings("unchecked") + public void testTumblingEventTimeWindowsApply() throws Exception { + closeCalled.set(0); + + final int WINDOW_SIZE = 3; + + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents", + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>( + TumblingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new RichSumReducer<TimeWindow>(), + EventTimeTrigger.create()); + + operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); + + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = + new OneInputStreamOperatorTestHarness<>(operator); + + testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.open(); + + testTumblingEventTimeWindows(testHarness); + + testHarness.close(); + + Assert.assertEquals("Close was not called.", 1, closeCalled.get()); } @Test @@ -249,13 +332,19 @@ public class WindowOperatorTest { final int WINDOW_SIZE = 3; - WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new WindowOperator<>( + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents", + new SumReducer(), + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new WindowOperator<>( GlobalWindows.create(), new GlobalWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), - windowBufferFactory, - new ReduceWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer()), + stateDesc, + new ReduceWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(), ContinuousEventTimeTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS))); operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig()); @@ -263,6 +352,8 @@ public class WindowOperatorTest { OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new OneInputStreamOperatorTestHarness<>(operator); + testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + long initialTime = 0L; ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); @@ -322,11 +413,6 @@ public class WindowOperatorTest { TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); testHarness.close(); - if (windowBufferFactory instanceof PreAggregatingHeapWindowBuffer.Factory) { - Assert.assertEquals("Close was not called.", 2, closeCalled.get()); - } else { - Assert.assertEquals("Close was not called.", 1, closeCalled.get()); - } } @Test @@ -336,13 +422,19 @@ public class WindowOperatorTest { final int WINDOW_SIZE = 4; - WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new WindowOperator<>( + TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + + ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents", + new SumReducer(), + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new WindowOperator<>( GlobalWindows.create(), new GlobalWindow.Serializer(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), - windowBufferFactory, - new ReduceWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer()), + stateDesc, + new ReduceWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(), PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE))); operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse( @@ -351,6 +443,8 @@ public class WindowOperatorTest { OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new OneInputStreamOperatorTestHarness<>(operator); + testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + long initialTime = 0L; ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); @@ -387,19 +481,23 @@ public class WindowOperatorTest { TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); testHarness.close(); - if (windowBufferFactory instanceof PreAggregatingHeapWindowBuffer.Factory) { - Assert.assertEquals("Close was not called.", 2, closeCalled.get()); - } else { - Assert.assertEquals("Close was not called.", 1, closeCalled.get()); - } - } // ------------------------------------------------------------------------ // UDFs // ------------------------------------------------------------------------ - public static class SumReducer extends RichReduceFunction<Tuple2<String, Integer>> { + public static class SumReducer implements ReduceFunction<Tuple2<String, Integer>> { + private static final long serialVersionUID = 1L; + @Override + public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, + Tuple2<String, Integer> value2) throws Exception { + return new Tuple2<>(value2.f0, value1.f1 + value2.f1); + } + } + + + public static class RichSumReducer<W extends Window> extends RichWindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, String, W> { private static final long serialVersionUID = 1L; private boolean openCalled = false; @@ -417,24 +515,23 @@ public class WindowOperatorTest { } @Override - public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, - Tuple2<String, Integer> value2) throws Exception { + public void apply(String key, + W window, + Iterable<Tuple2<String, Integer>> input, + Collector<Tuple2<String, Integer>> out) throws Exception { + if (!openCalled) { Assert.fail("Open was not called"); } - return new Tuple2<>(value2.f0, value1.f1 + value2.f1); + int sum = 0; + + for (Tuple2<String, Integer> t: input) { + sum += t.f1; + } + out.collect(new Tuple2<>(key, sum)); + } - } - // ------------------------------------------------------------------------ - // Parametrization for testing different window buffers - // ------------------------------------------------------------------------ - @Parameterized.Parameters(name = "WindowBuffer = {0}") - @SuppressWarnings("unchecked,rawtypes") - public static Collection<WindowBufferFactory[]> windowBuffers(){ - return Arrays.asList(new WindowBufferFactory[]{new PreAggregatingHeapWindowBuffer.Factory(new SumReducer())}, - new WindowBufferFactory[]{new HeapWindowBuffer.Factory()} - ); } @SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java index 13766a1..1e6e475 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java @@ -17,7 +17,10 @@ */ package org.apache.flink.streaming.runtime.operators.windowing; +import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; @@ -35,8 +38,6 @@ import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer; -import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.util.Collector; import org.junit.Assert; @@ -52,6 +53,29 @@ import java.util.concurrent.TimeUnit; public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { /** + * .reduce() does not support RichReduceFunction, since the reduce function is used internally + * in a {@code ReducingState}. + */ + @Test(expected = UnsupportedOperationException.class) + public void testReduceFailWithRichReducer() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); + + DataStream<Tuple2<String, Integer>> window1 = source + .keyBy(0) + .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) + .reduce(new RichReduceFunction<Tuple2<String, Integer>>() { + @Override + public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, + Tuple2<String, Integer> value2) throws Exception { + return null; + } + }); + } + + /** * These tests ensure that the fast aligned time windows operator is used if the * conditions are right. */ @@ -76,7 +100,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { DataStream<Tuple2<String, Integer>> window2 = source .keyBy(0) .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) - .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() { + .apply(new WindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, Tuple, TimeWindow>() { private static final long serialVersionUID = 1L; @Override @@ -118,12 +142,12 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { Assert.assertFalse(winOperator1.isSetProcessingTime()); Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger); Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows); - Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory); + Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ReducingStateDescriptor); DataStream<Tuple2<String, Integer>> window2 = source .keyBy(0) .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) - .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() { + .apply(new WindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, Tuple, TimeWindow>() { private static final long serialVersionUID = 1L; @Override @@ -142,7 +166,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { Assert.assertFalse(winOperator2.isSetProcessingTime()); Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger); Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows); - Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory); + Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor); } @Test @@ -168,13 +192,13 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { Assert.assertTrue(winOperator1.isSetProcessingTime()); Assert.assertTrue(winOperator1.getTrigger() instanceof CountTrigger); Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows); - Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory); + Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ReducingStateDescriptor); DataStream<Tuple2<String, Integer>> window2 = source .keyBy(0) .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) - .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() { + .apply(new WindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, Tuple, TimeWindow>() { private static final long serialVersionUID = 1L; @Override @@ -193,7 +217,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { Assert.assertTrue(winOperator2.isSetProcessingTime()); Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger); Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows); - Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory); + Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor); } @Test @@ -220,14 +244,14 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger); Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows); Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor); - Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory); + Assert.assertTrue(winOperator1.getStateDescriptor() instanceof ListStateDescriptor); DataStream<Tuple2<String, Integer>> window2 = source .keyBy(0) .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) .evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS))) - .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() { + .apply(new WindowFunction<Iterable<Tuple2<String, Integer>>, Tuple2<String, Integer>, Tuple, TimeWindow>() { private static final long serialVersionUID = 1L; @Override @@ -247,14 +271,14 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase { Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger); Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows); Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor); - Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory); + Assert.assertTrue(winOperator2.getStateDescriptor() instanceof ListStateDescriptor); } // ------------------------------------------------------------------------ // UDFs // ------------------------------------------------------------------------ - public static class DummyReducer extends RichReduceFunction<Tuple2<String, Integer>> { + public static class DummyReducer implements ReduceFunction<Tuple2<String, Integer>> { private static final long serialVersionUID = 1L; @Override http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala index 0357144..90e63c4 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala @@ -176,8 +176,15 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { * @param function The window function. * @return The data stream that is the result of applying the window function to the window. */ - def apply[R: TypeInformation: ClassTag](function: AllWindowFunction[T, R, W]): DataStream[R] = { - javaStream.apply(clean(function), implicitly[TypeInformation[R]]) + def apply[R: TypeInformation: ClassTag]( + function: AllWindowFunction[Iterable[T], R, W]): DataStream[R] = { + val cleanedFunction = clean(function) + val javaFunction = new AllWindowFunction[java.lang.Iterable[T], R, W] { + def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = { + cleanedFunction(window, elements.asScala, out) + } + } + javaStream.apply(javaFunction, implicitly[TypeInformation[R]]) } /** @@ -194,7 +201,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { def apply[R: TypeInformation: ClassTag]( function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = { val cleanedFunction = clean(function) - val applyFunction = new AllWindowFunction[T, R, W] { + val applyFunction = new AllWindowFunction[java.lang.Iterable[T], R, W] { def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = { cleanedFunction(window, elements.asScala, out) } @@ -232,7 +239,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { */ def apply[R: TypeInformation: ClassTag]( preAggregator: (T, T) => T, - function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = { + function: (W, T, Collector[R]) => Unit): DataStream[R] = { if (function == null) { throw new NullPointerException("Reduce function must not be null.") } @@ -247,8 +254,8 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { val cleanApply = clean(function) val applyFunction = new AllWindowFunction[T, R, W] { - def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = { - cleanApply(window, elements.asScala, out) + def apply(window: W, input: T, out: Collector[R]): Unit = { + cleanApply(window, input, out) } } javaStream.apply(reducer, applyFunction, implicitly[TypeInformation[R]]) http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala index 93b91ff..8a49f40 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala @@ -179,8 +179,15 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { * @param function The window function. * @return The data stream that is the result of applying the window function to the window. */ - def apply[R: TypeInformation: ClassTag](function: WindowFunction[T, R, K, W]): DataStream[R] = { - javaStream.apply(clean(function), implicitly[TypeInformation[R]]) + def apply[R: TypeInformation: ClassTag]( + function: WindowFunction[Iterable[T], R, K, W]): DataStream[R] = { + val cleanFunction = clean(function) + val javaFunction = new WindowFunction[java.lang.Iterable[T], R, K, W] { + def apply(key: K, window: W, input: java.lang.Iterable[T], out: Collector[R]) = { + cleanFunction.apply(key, window, input.asScala, out) + } + } + javaStream.apply(javaFunction, implicitly[TypeInformation[R]]) } /** @@ -201,7 +208,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { } val cleanedFunction = clean(function) - val applyFunction = new WindowFunction[T, R, K, W] { + val applyFunction = new WindowFunction[java.lang.Iterable[T], R, K, W] { def apply(key: K, window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = { cleanedFunction(key, window, elements.asScala, out) } @@ -239,7 +246,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { */ def apply[R: TypeInformation: ClassTag]( preAggregator: (T, T) => T, - function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] = { + function: (K, W, T, Collector[R]) => Unit): DataStream[R] = { if (function == null) { throw new NullPointerException("Reduce function must not be null.") } @@ -254,8 +261,8 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { val cleanApply = clean(function) val applyFunction = new WindowFunction[T, R, K, W] { - def apply(key: K, window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = { - cleanApply(key, window, elements.asScala, out) + def apply(key: K, window: W, input: T, out: Collector[R]): Unit = { + cleanApply(key, window, input, out) } } javaStream.apply(reducer, applyFunction, implicitly[TypeInformation[R]]) http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala index 4ec8f81..d293d1a 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala @@ -21,7 +21,8 @@ package org.apache.flink.streaming.api.scala import java.util.concurrent.TimeUnit -import org.apache.flink.api.common.functions.RichReduceFunction +import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.api.common.state.ReducingStateDescriptor import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.windowing.{WindowFunction, AllWindowFunction} @@ -75,12 +76,12 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { .windowAll(SlidingTimeWindows.of( Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) - .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() { - def apply( - window: TimeWindow, - values: java.lang.Iterable[(String, Int)], - out: Collector[(String, Int)]) { } - }) + .apply(new AllWindowFunction[Iterable[(String, Int)], (String, Int), TimeWindow]() { + def apply( + window: TimeWindow, + values: Iterable[(String, Int)], + out: Collector[(String, Int)]) { } + }) val transform2 = window2.javaStream.getTransformation .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]] @@ -121,10 +122,10 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { val window2 = source .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) - .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() { + .apply(new AllWindowFunction[Iterable[(String, Int)], (String, Int), TimeWindow]() { def apply( window: TimeWindow, - values: java.lang.Iterable[(String, Int)], + values: Iterable[(String, Int)], out: Collector[(String, Int)]) { } }) @@ -172,10 +173,10 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) .evictor(CountEvictor.of(1000)) - .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() { + .apply(new AllWindowFunction[Iterable[(String, Int)], (String, Int), TimeWindow]() { def apply( window: TimeWindow, - values: java.lang.Iterable[(String, Int)], + values: Iterable[(String, Int)], out: Collector[(String, Int)]) { } }) @@ -210,7 +211,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { def apply( tuple: Tuple, window: TimeWindow, - values: java.lang.Iterable[(String, Int)], + values: (String, Int), out: Collector[(String, Int)]) { } }) @@ -219,12 +220,12 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { val operator1 = transform1.getOperator - assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _]]) - val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _]] + assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _, _]]) + val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _, _]] assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]]) assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows]) assertTrue( - winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]]) + winOperator1.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]]) val window2 = source @@ -235,7 +236,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { def apply( tuple: Tuple, window: TimeWindow, - values: java.lang.Iterable[(String, Int)], + values: (String, Int), out: Collector[(String, Int)]) { } }) @@ -244,12 +245,12 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { val operator2 = transform2.getOperator - assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _]]) - val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _]] + assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _, _]]) + val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _, _]] assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]]) assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows]) assertTrue( - winOperator2.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]]) + winOperator2.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]]) } } @@ -258,7 +259,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { // UDFs // ------------------------------------------------------------------------ -class DummyReducer extends RichReduceFunction[(String, Int)] { +class DummyReducer extends ReduceFunction[(String, Int)] { def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = { value1 } http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala index 90cce66..dfb5ea2 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala @@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.scala import java.util.concurrent.TimeUnit +import org.apache.flink.api.common.state.{ListStateDescriptor, ReducingStateDescriptor} import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.functions.windowing.WindowFunction import org.apache.flink.streaming.api.transformations.OneInputTransformation @@ -28,7 +29,6 @@ import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvic import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, CountTrigger} import org.apache.flink.streaming.api.windowing.windows.TimeWindow -import org.apache.flink.streaming.runtime.operators.windowing.buffers.{HeapWindowBuffer, PreAggregatingHeapWindowBuffer} import org.apache.flink.streaming.runtime.operators.windowing.{EvictingWindowOperator, WindowOperator, AccumulatingProcessingTimeWindowOperator, AggregatingProcessingTimeWindowOperator} import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.apache.flink.util.Collector @@ -69,11 +69,11 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { .window(SlidingTimeWindows.of( Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) - .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { + .apply(new WindowFunction[Iterable[(String, Int)], (String, Int), Tuple, TimeWindow]() { def apply( key: Tuple, window: TimeWindow, - values: java.lang.Iterable[(String, Int)], + values: Iterable[(String, Int)], out: Collector[(String, Int)]) { } }) @@ -106,23 +106,23 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { val operator1 = transform1.getOperator - assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _]]) - val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _]] + assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _, _]]) + val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _, _]] assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]]) assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows]) assertTrue( - winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]]) + winOperator1.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]]) val window2 = source .keyBy(0) .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) - .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { + .apply(new WindowFunction[Iterable[(String, Int)], (String, Int), Tuple, TimeWindow]() { def apply( tuple: Tuple, window: TimeWindow, - values: java.lang.Iterable[(String, Int)], + values: Iterable[(String, Int)], out: Collector[(String, Int)]) { } }) @@ -131,11 +131,11 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { val operator2 = transform2.getOperator - assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _]]) - val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _]] + assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _, _]]) + val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _, _]] assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]]) assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows]) - assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]]) + assertTrue(winOperator2.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) } @Test @@ -164,7 +164,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { assertTrue(winOperator1.getTrigger.isInstanceOf[ProcessingTimeTrigger]) assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]]) assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows]) - assertTrue(winOperator1.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]]) + assertTrue(winOperator1.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) val window2 = source @@ -172,11 +172,11 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) .evictor(CountEvictor.of(1000)) - .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() { + .apply(new WindowFunction[Iterable[(String, Int)], (String, Int), Tuple, TimeWindow]() { def apply( tuple: Tuple, window: TimeWindow, - values: java.lang.Iterable[(String, Int)], + values: Iterable[(String, Int)], out: Collector[(String, Int)]) { } }) @@ -190,7 +190,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]]) assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]]) assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows]) - assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]]) + assertTrue(winOperator2.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]]) } @Test @@ -211,7 +211,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { def apply( tuple: Tuple, window: TimeWindow, - values: java.lang.Iterable[(String, Int)], + values: (String, Int), out: Collector[(String, Int)]) { } }) @@ -220,12 +220,12 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { val operator1 = transform1.getOperator - assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _]]) - val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _]] + assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _, _]]) + val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _, _]] assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]]) assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows]) assertTrue( - winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]]) + winOperator1.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]]) val window2 = source @@ -236,7 +236,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { def apply( tuple: Tuple, window: TimeWindow, - values: java.lang.Iterable[(String, Int)], + values: (String, Int), out: Collector[(String, Int)]) { } }) @@ -245,11 +245,11 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase { val operator2 = transform2.getOperator - assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _]]) - val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _]] + assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _, _]]) + val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _, _]] assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]]) assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows]) assertTrue( - winOperator2.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]]) + winOperator2.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]]) } } http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java index 18c1b3c..9eca074 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java @@ -18,13 +18,13 @@ package org.apache.flink.test.checkpointing; -import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; @@ -105,7 +105,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { NUM_ELEMENTS_PER_KEY / 3)) .rebalance() .timeWindowAll(Time.of(WINDOW_SIZE, MILLISECONDS)) - .apply(new RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() { + .apply(new RichAllWindowFunction<Iterable<Tuple2<Long, IntType>>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() { private boolean open = false; @@ -167,7 +167,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3)) .rebalance() .timeWindowAll(Time.of(WINDOW_SIZE, MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS)) - .apply(new RichAllWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() { + .apply(new RichAllWindowFunction<Iterable<Tuple2<Long, IntType>>, Tuple4<Long, Long, Long, IntType>, TimeWindow>() { private boolean open = false; @@ -231,23 +231,13 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { .rebalance() .timeWindowAll(Time.of(WINDOW_SIZE, MILLISECONDS)) .apply( - new RichReduceFunction<Tuple2<Long, IntType>>() { - - private boolean open = false; - - @Override - public void open(Configuration parameters) { - assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks()); - open = true; - } + new ReduceFunction<Tuple2<Long, IntType>>() { @Override public Tuple2<Long, IntType> reduce( Tuple2<Long, IntType> a, Tuple2<Long, IntType> b) { - // validate that the function has been opened properly - assertTrue(open); return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value)); } }, @@ -264,20 +254,13 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { @Override public void apply( TimeWindow window, - Iterable<Tuple2<Long, IntType>> values, + Tuple2<Long, IntType> input, Collector<Tuple4<Long, Long, Long, IntType>> out) { // validate that the function has been opened properly assertTrue(open); - int sum = 0; - long key = -1; - - for (Tuple2<Long, IntType> value : values) { - sum += value.f1.value; - key = value.f0; - } - out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum))); + out.collect(new Tuple4<>(input.f0, window.getStart(), window.getEnd(), input.f1)); } }) .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1); @@ -317,23 +300,13 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { .timeWindowAll(Time.of(WINDOW_SIZE, MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS)) .apply( - new RichReduceFunction<Tuple2<Long, IntType>>() { - - private boolean open = false; - - @Override - public void open(Configuration parameters) { - assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks()); - open = true; - } + new ReduceFunction<Tuple2<Long, IntType>>() { @Override public Tuple2<Long, IntType> reduce( Tuple2<Long, IntType> a, Tuple2<Long, IntType> b) { - // validate that the function has been opened properly - assertTrue(open); return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value)); } }, @@ -350,20 +323,13 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { @Override public void apply( TimeWindow window, - Iterable<Tuple2<Long, IntType>> values, + Tuple2<Long, IntType> input, Collector<Tuple4<Long, Long, Long, IntType>> out) { // validate that the function has been opened properly assertTrue(open); - int sum = 0; - long key = -1; - - for (Tuple2<Long, IntType> value : values) { - sum += value.f1.value; - key = value.f0; - } - out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum))); + out.collect(new Tuple4<>(input.f0, window.getStart(), window.getEnd(), input.f1)); } }) .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1); http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java index 7a1a879..5886982 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java @@ -18,15 +18,18 @@ package org.apache.flink.test.checkpointing; -import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.OperatorState; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; @@ -40,9 +43,17 @@ import org.apache.flink.test.util.SuccessException; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -55,12 +66,22 @@ import static org.junit.Assert.*; * of the emitted windows are deterministic. */ @SuppressWarnings("serial") +@RunWith(Parameterized.class) public class EventTimeWindowCheckpointingITCase extends TestLogger { private static final int PARALLELISM = 4; private static ForkableFlinkMiniCluster cluster; + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + private StateBackendEnum stateBackendEnum; + private AbstractStateBackend stateBackend; + + public EventTimeWindowCheckpointingITCase(StateBackendEnum stateBackendEnum) { + this.stateBackendEnum = stateBackendEnum; + } @BeforeClass public static void startTestCluster() { @@ -81,6 +102,19 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { } } + @Before + public void initStateBackend() throws IOException { + switch (stateBackendEnum) { + case MEM: + this.stateBackend = new MemoryStateBackend(); + break; + case FILE: + String backups = tempFolder.newFolder().getAbsolutePath(); + this.stateBackend = new FsStateBackend("file://" + backups); + break; + } + } + // ------------------------------------------------------------------------ @Test @@ -99,13 +133,14 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { env.enableCheckpointing(100); env.setNumberOfExecutionRetries(3); env.getConfig().disableSysoutLogging(); + env.setStateBackend(this.stateBackend); env .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3)) .rebalance() .keyBy(0) .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS)) - .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() { + .apply(new RichWindowFunction<Iterable<Tuple2<Long, IntType>>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() { private boolean open = false; @@ -162,13 +197,14 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { env.enableCheckpointing(100); env.setNumberOfExecutionRetries(3); env.getConfig().disableSysoutLogging(); + env.setStateBackend(this.stateBackend); env .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3)) .rebalance() .keyBy(0) .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS)) - .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() { + .apply(new RichWindowFunction<Iterable<Tuple2<Long, IntType>>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() { private boolean open = false; @@ -229,13 +265,14 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { env.enableCheckpointing(100); env.setNumberOfExecutionRetries(3); env.getConfig().disableSysoutLogging(); + env.setStateBackend(this.stateBackend); env .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3)) .rebalance() .keyBy(0) .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS)) - .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() { + .apply(new RichWindowFunction<Iterable<Tuple2<Long, IntType>>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() { private boolean open = false; @@ -292,6 +329,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { env.enableCheckpointing(100); env.setNumberOfExecutionRetries(3); env.getConfig().disableSysoutLogging(); + env.setStateBackend(this.stateBackend); env .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3)) @@ -299,23 +337,12 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { .keyBy(0) .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS)) .apply( - new RichReduceFunction<Tuple2<Long, IntType>>() { - - private boolean open = false; - - @Override - public void open(Configuration parameters) { - assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks()); - open = true; - } + new ReduceFunction<Tuple2<Long, IntType>>() { @Override public Tuple2<Long, IntType> reduce( Tuple2<Long, IntType> a, Tuple2<Long, IntType> b) { - - // validate that the function has been opened properly - assertTrue(open); return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value)); } }, @@ -333,20 +360,13 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { public void apply( Tuple tuple, TimeWindow window, - Iterable<Tuple2<Long, IntType>> values, + Tuple2<Long, IntType> input, Collector<Tuple4<Long, Long, Long, IntType>> out) { // validate that the function has been opened properly assertTrue(open); - int sum = 0; - long key = -1; - - for (Tuple2<Long, IntType> value : values) { - sum += value.f1.value; - key = value.f0; - } - out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum))); + out.collect(new Tuple4<>(input.f0, window.getStart(), window.getEnd(), input.f1)); } }) .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SIZE)).setParallelism(1); @@ -377,6 +397,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { env.enableCheckpointing(100); env.setNumberOfExecutionRetries(3); env.getConfig().disableSysoutLogging(); + env.setStateBackend(this.stateBackend); env .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3)) @@ -384,15 +405,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { .keyBy(0) .timeWindow(Time.of(WINDOW_SIZE, MILLISECONDS), Time.of(WINDOW_SLIDE, MILLISECONDS)) .apply( - new RichReduceFunction<Tuple2<Long, IntType>>() { - - private boolean open = false; - - @Override - public void open(Configuration parameters) { - assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks()); - open = true; - } + new ReduceFunction<Tuple2<Long, IntType>>() { @Override public Tuple2<Long, IntType> reduce( @@ -400,7 +413,6 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { Tuple2<Long, IntType> b) { // validate that the function has been opened properly - assertTrue(open); return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value)); } }, @@ -418,20 +430,13 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { public void apply( Tuple tuple, TimeWindow window, - Iterable<Tuple2<Long, IntType>> values, + Tuple2<Long, IntType> input, Collector<Tuple4<Long, Long, Long, IntType>> out) { // validate that the function has been opened properly assertTrue(open); - int sum = 0; - long key = -1; - - for (Tuple2<Long, IntType> value : values) { - sum += value.f1.value; - key = value.f0; - } - out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum))); + out.collect(new Tuple4<>(input.f0, window.getStart(), window.getEnd(), input.f1)); } }) .addSink(new ValidatingSink(NUM_KEYS, NUM_ELEMENTS_PER_KEY / WINDOW_SLIDE)).setParallelism(1); @@ -583,7 +588,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { } } } - assertTrue("The source must see all expected windows.", seenAll); + assertTrue("The sink must see all expected windows.", seenAll); } @Override @@ -723,6 +728,25 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { } } + // ------------------------------------------------------------------------ + // Parametrization for testing with different state backends + // ------------------------------------------------------------------------ + + + @Parameterized.Parameters(name = "StateBackend = {0}") + @SuppressWarnings("unchecked,rawtypes") + public static Collection<Object[]> parameters(){ + return Arrays.asList(new Object[][] { + {StateBackendEnum.MEM}, + {StateBackendEnum.FILE}, + } + ); + } + + private enum StateBackendEnum { + MEM, FILE, DB, ROCKSDB + } + // ------------------------------------------------------------------------ // Utilities http://git-wip-us.apache.org/repos/asf/flink/blob/67ca4a43/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java index 8d59975..c9286ce 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java @@ -19,15 +19,15 @@ package org.apache.flink.test.checkpointing; import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; @@ -117,7 +117,7 @@ public class WindowCheckpointingITCase extends TestLogger { .rebalance() .keyBy(0) .timeWindow(Time.of(100, MILLISECONDS)) - .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>, Tuple, TimeWindow>() { + .apply(new RichWindowFunction<Iterable<Tuple2<Long, IntType>>, Tuple2<Long, IntType>, Tuple, TimeWindow>() { private boolean open = false; @@ -175,7 +175,7 @@ public class WindowCheckpointingITCase extends TestLogger { .rebalance() .keyBy(0) .timeWindow(Time.of(150, MILLISECONDS), Time.of(50, MILLISECONDS)) - .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple2<Long, IntType>, Tuple, TimeWindow>() { + .apply(new RichWindowFunction<Iterable<Tuple2<Long, IntType>>, Tuple2<Long, IntType>, Tuple, TimeWindow>() { private boolean open = false; @@ -240,23 +240,12 @@ public class WindowCheckpointingITCase extends TestLogger { .rebalance() .keyBy(0) .timeWindow(Time.of(100, MILLISECONDS)) - .reduce(new RichReduceFunction<Tuple2<Long, IntType>>() { - - private boolean open = false; - - @Override - public void open(Configuration parameters) { - assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks()); - open = true; - } + .reduce(new ReduceFunction<Tuple2<Long, IntType>>() { @Override public Tuple2<Long, IntType> reduce( Tuple2<Long, IntType> a, Tuple2<Long, IntType> b) { - - // validate that the function has been opened properly - assertTrue(open); return new Tuple2<>(a.f0, new IntType(1)); } }) @@ -299,23 +288,11 @@ public class WindowCheckpointingITCase extends TestLogger { .rebalance() .keyBy(0) .timeWindow(Time.of(150, MILLISECONDS), Time.of(50, MILLISECONDS)) - .reduce(new RichReduceFunction<Tuple2<Long, IntType>>() { - - private boolean open = false; - - @Override - public void open(Configuration parameters) { - assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks()); - open = true; - } - + .reduce(new ReduceFunction<Tuple2<Long, IntType>>() { @Override public Tuple2<Long, IntType> reduce( Tuple2<Long, IntType> a, Tuple2<Long, IntType> b) { - - // validate that the function has been opened properly - assertTrue(open); return new Tuple2<>(a.f0, new IntType(1)); } })
