Repository: flink Updated Branches: refs/heads/master 3a27f55cf -> 910f733f5
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java new file mode 100644 index 0000000..74fd044 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + + +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.functions.RichProcessFunction; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.Collector; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.junit.Assert.assertEquals; + +/** + * Tests {@link ProcessOperator}. + */ +public class ProcessOperatorTest extends TestLogger { + + @Test + public void testTimestampAndWatermarkQuerying() throws Exception { + + ProcessOperator<Integer, Integer, String> operator = + new ProcessOperator<>(new QueryingFlatMapFunction(TimeDomain.EVENT_TIME)); + + OneInputStreamOperatorTestHarness<Integer, String> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processWatermark(new Watermark(17)); + testHarness.processElement(new StreamRecord<>(5, 12L)); + + testHarness.processWatermark(new Watermark(42)); + testHarness.processElement(new StreamRecord<>(6, 13L)); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new Watermark(17L)); + expectedOutput.add(new StreamRecord<>("5TIME:17 TS:12", 12L)); + expectedOutput.add(new Watermark(42L)); + expectedOutput.add(new StreamRecord<>("6TIME:42 TS:13", 13L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + public void testTimestampAndProcessingTimeQuerying() throws Exception { + + ProcessOperator<Integer, Integer, String> operator = + new ProcessOperator<>(new QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME)); + + OneInputStreamOperatorTestHarness<Integer, String> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(17); + testHarness.processElement(new StreamRecord<>(5)); + + testHarness.setProcessingTime(42); + testHarness.processElement(new StreamRecord<>(6)); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("5TIME:17 TS:null")); + expectedOutput.add(new StreamRecord<>("6TIME:42 TS:null")); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + public void testEventTimeTimers() throws Exception { + + ProcessOperator<Integer, Integer, Integer> operator = + new ProcessOperator<>(new TriggeringFlatMapFunction(TimeDomain.EVENT_TIME)); + + OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processWatermark(new Watermark(0)); + + testHarness.processElement(new StreamRecord<>(17, 42L)); + + testHarness.processWatermark(new Watermark(5)); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new Watermark(0L)); + expectedOutput.add(new StreamRecord<>(17, 42L)); + expectedOutput.add(new StreamRecord<>(1777, 5L)); + expectedOutput.add(new Watermark(5L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + public void testProcessingTimeTimers() throws Exception { + + ProcessOperator<Integer, Integer, Integer> operator = + new ProcessOperator<>(new TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME)); + + OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(17)); + + testHarness.setProcessingTime(5); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>(17)); + expectedOutput.add(new StreamRecord<>(1777, 5L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + /** + * Verifies that we don't have leakage between different keys. + */ + @Test + public void testEventTimeTimerWithState() throws Exception { + + ProcessOperator<Integer, Integer, String> operator = + new ProcessOperator<>(new TriggeringStatefulFlatMapFunction(TimeDomain.EVENT_TIME)); + + OneInputStreamOperatorTestHarness<Integer, String> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processWatermark(new Watermark(1)); + testHarness.processElement(new StreamRecord<>(17, 0L)); // should set timer for 6 + + testHarness.processWatermark(new Watermark(2)); + testHarness.processElement(new StreamRecord<>(42, 1L)); // should set timer for 7 + + testHarness.processWatermark(new Watermark(6)); + testHarness.processWatermark(new Watermark(7)); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new Watermark(1L)); + expectedOutput.add(new StreamRecord<>("INPUT:17", 0L)); + expectedOutput.add(new Watermark(2L)); + expectedOutput.add(new StreamRecord<>("INPUT:42", 1L)); + expectedOutput.add(new StreamRecord<>("STATE:17", 6L)); + expectedOutput.add(new Watermark(6L)); + expectedOutput.add(new StreamRecord<>("STATE:42", 7L)); + expectedOutput.add(new Watermark(7L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + /** + * Verifies that we don't have leakage between different keys. + */ + @Test + public void testProcessingTimeTimerWithState() throws Exception { + + ProcessOperator<Integer, Integer, String> operator = + new ProcessOperator<>(new TriggeringStatefulFlatMapFunction(TimeDomain.PROCESSING_TIME)); + + OneInputStreamOperatorTestHarness<Integer, String> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(1); + testHarness.processElement(new StreamRecord<>(17)); // should set timer for 6 + + testHarness.setProcessingTime(2); + testHarness.processElement(new StreamRecord<>(42)); // should set timer for 7 + + testHarness.setProcessingTime(6); + testHarness.setProcessingTime(7); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("INPUT:17")); + expectedOutput.add(new StreamRecord<>("INPUT:42")); + expectedOutput.add(new StreamRecord<>("STATE:17", 6L)); + expectedOutput.add(new StreamRecord<>("STATE:42", 7L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + public void testSnapshotAndRestore() throws Exception { + + ProcessOperator<Integer, Integer, String> operator = + new ProcessOperator<>(new BothTriggeringFlatMapFunction()); + + OneInputStreamOperatorTestHarness<Integer, String> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(5, 12L)); + + // snapshot and restore from scratch + OperatorStateHandles snapshot = testHarness.snapshot(0, 0); + + testHarness.close(); + + operator = new ProcessOperator<>(new BothTriggeringFlatMapFunction()); + + testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + + testHarness.setProcessingTime(5); + testHarness.processWatermark(new Watermark(6)); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("PROC:1777", 5L)); + expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L)); + expectedOutput.add(new Watermark(6)); + + System.out.println("GOT: " + testHarness.getOutput()); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + private static class IdentityKeySelector<T> implements KeySelector<T, T> { + private static final long serialVersionUID = 1L; + + @Override + public T getKey(T value) throws Exception { + return value; + } + } + + private static class QueryingFlatMapFunction implements ProcessFunction<Integer, String> { + + private static final long serialVersionUID = 1L; + + private final TimeDomain timeDomain; + + public QueryingFlatMapFunction(TimeDomain timeDomain) { + this.timeDomain = timeDomain; + } + + @Override + public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception { + if (timeDomain.equals(TimeDomain.EVENT_TIME)) { + out.collect(value + "TIME:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp()); + } else { + out.collect(value + "TIME:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp()); + } + } + + @Override + public void onTimer( + long timestamp, + OnTimerContext ctx, + Collector<String> out) throws Exception { + } + } + + private static class TriggeringFlatMapFunction implements ProcessFunction<Integer, Integer> { + + private static final long serialVersionUID = 1L; + + private final TimeDomain timeDomain; + + public TriggeringFlatMapFunction(TimeDomain timeDomain) { + this.timeDomain = timeDomain; + } + + @Override + public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception { + out.collect(value); + if (timeDomain.equals(TimeDomain.EVENT_TIME)) { + ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5); + } else { + ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5); + } + } + + @Override + public void onTimer( + long timestamp, + OnTimerContext ctx, + Collector<Integer> out) throws Exception { + + assertEquals(this.timeDomain, ctx.timeDomain()); + out.collect(1777); + } + } + + private static class TriggeringStatefulFlatMapFunction extends RichProcessFunction<Integer, String> { + + private static final long serialVersionUID = 1L; + + private final ValueStateDescriptor<Integer> state = + new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE, null); + + private final TimeDomain timeDomain; + + public TriggeringStatefulFlatMapFunction(TimeDomain timeDomain) { + this.timeDomain = timeDomain; + } + + @Override + public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception { + out.collect("INPUT:" + value); + getRuntimeContext().getState(state).update(value); + if (timeDomain.equals(TimeDomain.EVENT_TIME)) { + ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5); + } else { + ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5); + } + } + + @Override + public void onTimer( + long timestamp, + OnTimerContext ctx, + Collector<String> out) throws Exception { + assertEquals(this.timeDomain, ctx.timeDomain()); + out.collect("STATE:" + getRuntimeContext().getState(state).value()); + } + } + + private static class BothTriggeringFlatMapFunction implements ProcessFunction<Integer, String> { + + private static final long serialVersionUID = 1L; + + @Override + public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception { + ctx.timerService().registerProcessingTimeTimer(5); + ctx.timerService().registerEventTimeTimer(6); + } + + @Override + public void onTimer( + long timestamp, + OnTimerContext ctx, + Collector<String> out) throws Exception { + if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) { + out.collect("EVENT:1777"); + } else { + out.collect("PROC:1777"); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java deleted file mode 100644 index 6080ddc..0000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java +++ /dev/null @@ -1,404 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.api.operators; - - -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.streaming.api.TimeDomain; -import org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction; -import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; -import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; -import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.apache.flink.streaming.util.TestHarnessUtil; -import org.apache.flink.util.Collector; -import org.apache.flink.util.TestLogger; -import org.junit.Test; - -import java.util.concurrent.ConcurrentLinkedQueue; - -import static org.junit.Assert.assertEquals; - -/** - * Tests {@link StreamTimelyFlatMap}. - */ -public class TimelyFlatMapTest extends TestLogger { - - @Test - public void testTimestampAndWatermarkQuerying() throws Exception { - - StreamTimelyFlatMap<Integer, Integer, String> operator = - new StreamTimelyFlatMap<>(new QueryingFlatMapFunction(TimeDomain.EVENT_TIME)); - - OneInputStreamOperatorTestHarness<Integer, String> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); - - testHarness.setup(); - testHarness.open(); - - testHarness.processWatermark(new Watermark(17)); - testHarness.processElement(new StreamRecord<>(5, 12L)); - - testHarness.processWatermark(new Watermark(42)); - testHarness.processElement(new StreamRecord<>(6, 13L)); - - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); - - expectedOutput.add(new Watermark(17L)); - expectedOutput.add(new StreamRecord<>("5TIME:17 TS:12", 12L)); - expectedOutput.add(new Watermark(42L)); - expectedOutput.add(new StreamRecord<>("6TIME:42 TS:13", 13L)); - - TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); - - testHarness.close(); - } - - @Test - public void testTimestampAndProcessingTimeQuerying() throws Exception { - - StreamTimelyFlatMap<Integer, Integer, String> operator = - new StreamTimelyFlatMap<>(new QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME)); - - OneInputStreamOperatorTestHarness<Integer, String> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); - - testHarness.setup(); - testHarness.open(); - - testHarness.setProcessingTime(17); - testHarness.processElement(new StreamRecord<>(5)); - - testHarness.setProcessingTime(42); - testHarness.processElement(new StreamRecord<>(6)); - - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); - - expectedOutput.add(new StreamRecord<>("5TIME:17 TS:null")); - expectedOutput.add(new StreamRecord<>("6TIME:42 TS:null")); - - TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); - - testHarness.close(); - } - - @Test - public void testEventTimeTimers() throws Exception { - - StreamTimelyFlatMap<Integer, Integer, Integer> operator = - new StreamTimelyFlatMap<>(new TriggeringFlatMapFunction(TimeDomain.EVENT_TIME)); - - OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); - - testHarness.setup(); - testHarness.open(); - - testHarness.processWatermark(new Watermark(0)); - - testHarness.processElement(new StreamRecord<>(17, 42L)); - - testHarness.processWatermark(new Watermark(5)); - - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); - - expectedOutput.add(new Watermark(0L)); - expectedOutput.add(new StreamRecord<>(17, 42L)); - expectedOutput.add(new StreamRecord<>(1777, 5L)); - expectedOutput.add(new Watermark(5L)); - - TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); - - testHarness.close(); - } - - @Test - public void testProcessingTimeTimers() throws Exception { - - StreamTimelyFlatMap<Integer, Integer, Integer> operator = - new StreamTimelyFlatMap<>(new TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME)); - - OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); - - testHarness.setup(); - testHarness.open(); - - testHarness.processElement(new StreamRecord<>(17)); - - testHarness.setProcessingTime(5); - - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); - - expectedOutput.add(new StreamRecord<>(17)); - expectedOutput.add(new StreamRecord<>(1777, 5L)); - - TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); - - testHarness.close(); - } - - /** - * Verifies that we don't have leakage between different keys. - */ - @Test - public void testEventTimeTimerWithState() throws Exception { - - StreamTimelyFlatMap<Integer, Integer, String> operator = - new StreamTimelyFlatMap<>(new TriggeringStatefulFlatMapFunction(TimeDomain.EVENT_TIME)); - - OneInputStreamOperatorTestHarness<Integer, String> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); - - testHarness.setup(); - testHarness.open(); - - testHarness.processWatermark(new Watermark(1)); - testHarness.processElement(new StreamRecord<>(17, 0L)); // should set timer for 6 - - testHarness.processWatermark(new Watermark(2)); - testHarness.processElement(new StreamRecord<>(42, 1L)); // should set timer for 7 - - testHarness.processWatermark(new Watermark(6)); - testHarness.processWatermark(new Watermark(7)); - - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); - - expectedOutput.add(new Watermark(1L)); - expectedOutput.add(new StreamRecord<>("INPUT:17", 0L)); - expectedOutput.add(new Watermark(2L)); - expectedOutput.add(new StreamRecord<>("INPUT:42", 1L)); - expectedOutput.add(new StreamRecord<>("STATE:17", 6L)); - expectedOutput.add(new Watermark(6L)); - expectedOutput.add(new StreamRecord<>("STATE:42", 7L)); - expectedOutput.add(new Watermark(7L)); - - TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); - - testHarness.close(); - } - - /** - * Verifies that we don't have leakage between different keys. - */ - @Test - public void testProcessingTimeTimerWithState() throws Exception { - - StreamTimelyFlatMap<Integer, Integer, String> operator = - new StreamTimelyFlatMap<>(new TriggeringStatefulFlatMapFunction(TimeDomain.PROCESSING_TIME)); - - OneInputStreamOperatorTestHarness<Integer, String> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); - - testHarness.setup(); - testHarness.open(); - - testHarness.setProcessingTime(1); - testHarness.processElement(new StreamRecord<>(17)); // should set timer for 6 - - testHarness.setProcessingTime(2); - testHarness.processElement(new StreamRecord<>(42)); // should set timer for 7 - - testHarness.setProcessingTime(6); - testHarness.setProcessingTime(7); - - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); - - expectedOutput.add(new StreamRecord<>("INPUT:17")); - expectedOutput.add(new StreamRecord<>("INPUT:42")); - expectedOutput.add(new StreamRecord<>("STATE:17", 6L)); - expectedOutput.add(new StreamRecord<>("STATE:42", 7L)); - - TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); - - testHarness.close(); - } - - @Test - public void testSnapshotAndRestore() throws Exception { - - StreamTimelyFlatMap<Integer, Integer, String> operator = - new StreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction()); - - OneInputStreamOperatorTestHarness<Integer, String> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); - - testHarness.setup(); - testHarness.open(); - - testHarness.processElement(new StreamRecord<>(5, 12L)); - - // snapshot and restore from scratch - OperatorStateHandles snapshot = testHarness.snapshot(0, 0); - - testHarness.close(); - - operator = new StreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction()); - - testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); - - testHarness.setup(); - testHarness.initializeState(snapshot); - testHarness.open(); - - testHarness.setProcessingTime(5); - testHarness.processWatermark(new Watermark(6)); - - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); - - expectedOutput.add(new StreamRecord<>("PROC:1777", 5L)); - expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L)); - expectedOutput.add(new Watermark(6)); - - System.out.println("GOT: " + testHarness.getOutput()); - - TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); - - testHarness.close(); - } - - private static class IdentityKeySelector<T> implements KeySelector<T, T> { - private static final long serialVersionUID = 1L; - - @Override - public T getKey(T value) throws Exception { - return value; - } - } - - private static class QueryingFlatMapFunction implements TimelyFlatMapFunction<Integer, String> { - - private static final long serialVersionUID = 1L; - - private final TimeDomain timeDomain; - - public QueryingFlatMapFunction(TimeDomain timeDomain) { - this.timeDomain = timeDomain; - } - - @Override - public void flatMap(Integer value, Context ctx, Collector<String> out) throws Exception { - if (timeDomain.equals(TimeDomain.EVENT_TIME)) { - out.collect(value + "TIME:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp()); - } else { - out.collect(value + "TIME:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp()); - } - } - - @Override - public void onTimer( - long timestamp, - OnTimerContext ctx, - Collector<String> out) throws Exception { - } - } - - private static class TriggeringFlatMapFunction implements TimelyFlatMapFunction<Integer, Integer> { - - private static final long serialVersionUID = 1L; - - private final TimeDomain timeDomain; - - public TriggeringFlatMapFunction(TimeDomain timeDomain) { - this.timeDomain = timeDomain; - } - - @Override - public void flatMap(Integer value, Context ctx, Collector<Integer> out) throws Exception { - out.collect(value); - if (timeDomain.equals(TimeDomain.EVENT_TIME)) { - ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5); - } else { - ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5); - } - } - - @Override - public void onTimer( - long timestamp, - OnTimerContext ctx, - Collector<Integer> out) throws Exception { - - assertEquals(this.timeDomain, ctx.timeDomain()); - out.collect(1777); - } - } - - private static class TriggeringStatefulFlatMapFunction extends RichTimelyFlatMapFunction<Integer, String> { - - private static final long serialVersionUID = 1L; - - private final ValueStateDescriptor<Integer> state = - new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE, null); - - private final TimeDomain timeDomain; - - public TriggeringStatefulFlatMapFunction(TimeDomain timeDomain) { - this.timeDomain = timeDomain; - } - - @Override - public void flatMap(Integer value, Context ctx, Collector<String> out) throws Exception { - out.collect("INPUT:" + value); - getRuntimeContext().getState(state).update(value); - if (timeDomain.equals(TimeDomain.EVENT_TIME)) { - ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5); - } else { - ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5); - } - } - - @Override - public void onTimer( - long timestamp, - OnTimerContext ctx, - Collector<String> out) throws Exception { - assertEquals(this.timeDomain, ctx.timeDomain()); - out.collect("STATE:" + getRuntimeContext().getState(state).value()); - } - } - - private static class BothTriggeringFlatMapFunction implements TimelyFlatMapFunction<Integer, String> { - - private static final long serialVersionUID = 1L; - - @Override - public void flatMap(Integer value, Context ctx, Collector<String> out) throws Exception { - ctx.timerService().registerProcessingTimeTimer(5); - ctx.timerService().registerEventTimeTimer(6); - } - - @Override - public void onTimer( - long timestamp, - OnTimerContext ctx, - Collector<String> out) throws Exception { - if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) { - out.collect("EVENT:1777"); - } else { - out.collect("PROC:1777"); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java new file mode 100644 index 0000000..a449359 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java @@ -0,0 +1,536 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators.co; + + +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.functions.co.RichCoProcessFunction; +import org.apache.flink.streaming.api.functions.co.CoProcessFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness; +import org.apache.flink.util.Collector; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.junit.Assert.assertEquals; + +/** + * Tests {@link CoProcessOperator}. + */ +public class CoProcessOperatorTest extends TestLogger { + + @Test + public void testTimestampAndWatermarkQuerying() throws Exception { + + CoProcessOperator<String, Integer, String, String> operator = + new CoProcessOperator<>(new WatermarkQueryingProcessFunction()); + + TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness = + new KeyedTwoInputStreamOperatorTestHarness<>( + operator, + new IntToStringKeySelector<>(), + new IdentityKeySelector<String>(), + BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processWatermark1(new Watermark(17)); + testHarness.processWatermark2(new Watermark(17)); + testHarness.processElement1(new StreamRecord<>(5, 12L)); + + testHarness.processWatermark1(new Watermark(42)); + testHarness.processWatermark2(new Watermark(42)); + testHarness.processElement2(new StreamRecord<>("6", 13L)); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new Watermark(17L)); + expectedOutput.add(new StreamRecord<>("5WM:17 TS:12", 12L)); + expectedOutput.add(new Watermark(42L)); + expectedOutput.add(new StreamRecord<>("6WM:42 TS:13", 13L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + public void testTimestampAndProcessingTimeQuerying() throws Exception { + + CoProcessOperator<String, Integer, String, String> operator = + new CoProcessOperator<>(new ProcessingTimeQueryingProcessFunction()); + + TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness = + new KeyedTwoInputStreamOperatorTestHarness<>( + operator, + new IntToStringKeySelector<>(), + new IdentityKeySelector<String>(), + BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(17); + testHarness.processElement1(new StreamRecord<>(5)); + + testHarness.setProcessingTime(42); + testHarness.processElement2(new StreamRecord<>("6")); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("5PT:17 TS:null")); + expectedOutput.add(new StreamRecord<>("6PT:42 TS:null")); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + public void testEventTimeTimers() throws Exception { + + CoProcessOperator<String, Integer, String, String> operator = + new CoProcessOperator<>(new EventTimeTriggeringProcessFunction()); + + TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness = + new KeyedTwoInputStreamOperatorTestHarness<>( + operator, + new IntToStringKeySelector<>(), + new IdentityKeySelector<String>(), + BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processElement1(new StreamRecord<>(17, 42L)); + testHarness.processElement2(new StreamRecord<>("18", 42L)); + + testHarness.processWatermark1(new Watermark(5)); + testHarness.processWatermark2(new Watermark(5)); + + testHarness.processWatermark1(new Watermark(6)); + testHarness.processWatermark2(new Watermark(6)); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("INPUT1:17", 42L)); + expectedOutput.add(new StreamRecord<>("INPUT2:18", 42L)); + expectedOutput.add(new StreamRecord<>("1777", 5L)); + expectedOutput.add(new Watermark(5L)); + expectedOutput.add(new StreamRecord<>("1777", 6L)); + expectedOutput.add(new Watermark(6L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + public void testProcessingTimeTimers() throws Exception { + + CoProcessOperator<String, Integer, String, String> operator = + new CoProcessOperator<>(new ProcessingTimeTriggeringProcessFunction()); + + TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness = + new KeyedTwoInputStreamOperatorTestHarness<>( + operator, + new IntToStringKeySelector<>(), + new IdentityKeySelector<String>(), + BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processElement1(new StreamRecord<>(17)); + testHarness.processElement2(new StreamRecord<>("18")); + + testHarness.setProcessingTime(5); + testHarness.setProcessingTime(6); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("INPUT1:17")); + expectedOutput.add(new StreamRecord<>("INPUT2:18")); + expectedOutput.add(new StreamRecord<>("1777", 5L)); + expectedOutput.add(new StreamRecord<>("1777", 6L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + /** + * Verifies that we don't have leakage between different keys. + */ + @Test + public void testEventTimeTimerWithState() throws Exception { + + CoProcessOperator<String, Integer, String, String> operator = + new CoProcessOperator<>(new EventTimeTriggeringStatefulProcessFunction()); + + TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness = + new KeyedTwoInputStreamOperatorTestHarness<>( + operator, + new IntToStringKeySelector<>(), + new IdentityKeySelector<String>(), + BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processWatermark1(new Watermark(1)); + testHarness.processWatermark2(new Watermark(1)); + testHarness.processElement1(new StreamRecord<>(17, 0L)); // should set timer for 6 + + testHarness.processWatermark1(new Watermark(2)); + testHarness.processWatermark2(new Watermark(2)); + testHarness.processElement2(new StreamRecord<>("42", 1L)); // should set timer for 7 + + testHarness.processWatermark1(new Watermark(6)); + testHarness.processWatermark2(new Watermark(6)); + + testHarness.processWatermark1(new Watermark(7)); + testHarness.processWatermark2(new Watermark(7)); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new Watermark(1L)); + expectedOutput.add(new StreamRecord<>("INPUT1:17", 0L)); + expectedOutput.add(new Watermark(2L)); + expectedOutput.add(new StreamRecord<>("INPUT2:42", 1L)); + expectedOutput.add(new StreamRecord<>("STATE:17", 6L)); + expectedOutput.add(new Watermark(6L)); + expectedOutput.add(new StreamRecord<>("STATE:42", 7L)); + expectedOutput.add(new Watermark(7L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + /** + * Verifies that we don't have leakage between different keys. + */ + @Test + public void testProcessingTimeTimerWithState() throws Exception { + + CoProcessOperator<String, Integer, String, String> operator = + new CoProcessOperator<>(new ProcessingTimeTriggeringStatefulProcessFunction()); + + TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness = + new KeyedTwoInputStreamOperatorTestHarness<>( + operator, + new IntToStringKeySelector<>(), + new IdentityKeySelector<String>(), + BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(1); + testHarness.processElement1(new StreamRecord<>(17)); // should set timer for 6 + + testHarness.setProcessingTime(2); + testHarness.processElement2(new StreamRecord<>("42")); // should set timer for 7 + + testHarness.setProcessingTime(6); + testHarness.setProcessingTime(7); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("INPUT1:17")); + expectedOutput.add(new StreamRecord<>("INPUT2:42")); + expectedOutput.add(new StreamRecord<>("STATE:17", 6L)); + expectedOutput.add(new StreamRecord<>("STATE:42", 7L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + public void testSnapshotAndRestore() throws Exception { + + CoProcessOperator<String, Integer, String, String> operator = + new CoProcessOperator<>(new BothTriggeringProcessFunction()); + + TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness = + new KeyedTwoInputStreamOperatorTestHarness<>( + operator, + new IntToStringKeySelector<>(), + new IdentityKeySelector<String>(), + BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processElement1(new StreamRecord<>(5, 12L)); + testHarness.processElement2(new StreamRecord<>("5", 12L)); + + // snapshot and restore from scratch + OperatorStateHandles snapshot = testHarness.snapshot(0, 0); + + testHarness.close(); + + operator = new CoProcessOperator<>(new BothTriggeringProcessFunction()); + + testHarness = new KeyedTwoInputStreamOperatorTestHarness<>( + operator, + new IntToStringKeySelector<>(), + new IdentityKeySelector<String>(), + BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + + testHarness.setProcessingTime(5); + testHarness.processWatermark1(new Watermark(6)); + testHarness.processWatermark2(new Watermark(6)); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("PROC:1777", 5L)); + expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L)); + expectedOutput.add(new Watermark(6)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + + private static class IntToStringKeySelector<T> implements KeySelector<Integer, String> { + private static final long serialVersionUID = 1L; + + @Override + public String getKey(Integer value) throws Exception { + return "" + value; + } + } + + private static class IdentityKeySelector<T> implements KeySelector<T, T> { + private static final long serialVersionUID = 1L; + + @Override + public T getKey(T value) throws Exception { + return value; + } + } + + private static class WatermarkQueryingProcessFunction implements CoProcessFunction<Integer, String, String> { + + private static final long serialVersionUID = 1L; + + @Override + public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception { + out.collect(value + "WM:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp()); + } + + @Override + public void processElement2(String value, Context ctx, Collector<String> out) throws Exception { + out.collect(value + "WM:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp()); + } + + @Override + public void onTimer( + long timestamp, + OnTimerContext ctx, + Collector<String> out) throws Exception { + } + } + + private static class EventTimeTriggeringProcessFunction implements CoProcessFunction<Integer, String, String> { + + private static final long serialVersionUID = 1L; + + @Override + public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception { + out.collect("INPUT1:" + value); + ctx.timerService().registerEventTimeTimer(5); + } + + @Override + public void processElement2(String value, Context ctx, Collector<String> out) throws Exception { + out.collect("INPUT2:" + value); + ctx.timerService().registerEventTimeTimer(6); + } + + @Override + public void onTimer( + long timestamp, + OnTimerContext ctx, + Collector<String> out) throws Exception { + + assertEquals(TimeDomain.EVENT_TIME, ctx.timeDomain()); + out.collect("" + 1777); + } + } + + private static class EventTimeTriggeringStatefulProcessFunction extends RichCoProcessFunction<Integer, String, String> { + + private static final long serialVersionUID = 1L; + + private final ValueStateDescriptor<String> state = + new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE, null); + + @Override + public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception { + out.collect("INPUT1:" + value); + getRuntimeContext().getState(state).update("" + value); + ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5); + } + + @Override + public void processElement2(String value, Context ctx, Collector<String> out) throws Exception { + out.collect("INPUT2:" + value); + getRuntimeContext().getState(state).update(value); + ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5); + } + + + @Override + public void onTimer( + long timestamp, + OnTimerContext ctx, + Collector<String> out) throws Exception { + assertEquals(TimeDomain.EVENT_TIME, ctx.timeDomain()); + out.collect("STATE:" + getRuntimeContext().getState(state).value()); + } + } + + private static class ProcessingTimeTriggeringProcessFunction implements CoProcessFunction<Integer, String, String> { + + private static final long serialVersionUID = 1L; + + @Override + public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception { + out.collect("INPUT1:" + value); + ctx.timerService().registerProcessingTimeTimer(5); + } + + @Override + public void processElement2(String value, Context ctx, Collector<String> out) throws Exception { + out.collect("INPUT2:" + value); + ctx.timerService().registerProcessingTimeTimer(6); + } + + @Override + public void onTimer( + long timestamp, + OnTimerContext ctx, + Collector<String> out) throws Exception { + + assertEquals(TimeDomain.PROCESSING_TIME, ctx.timeDomain()); + out.collect("" + 1777); + } + } + + private static class ProcessingTimeQueryingProcessFunction implements CoProcessFunction<Integer, String, String> { + + private static final long serialVersionUID = 1L; + + @Override + public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception { + out.collect(value + "PT:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp()); + } + + @Override + public void processElement2(String value, Context ctx, Collector<String> out) throws Exception { + out.collect(value + "PT:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp()); + } + + @Override + public void onTimer( + long timestamp, + OnTimerContext ctx, + Collector<String> out) throws Exception { + } + } + + private static class ProcessingTimeTriggeringStatefulProcessFunction extends RichCoProcessFunction<Integer, String, String> { + + private static final long serialVersionUID = 1L; + + private final ValueStateDescriptor<String> state = + new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE, null); + + @Override + public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception { + out.collect("INPUT1:" + value); + getRuntimeContext().getState(state).update("" + value); + ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5); + } + + @Override + public void processElement2(String value, Context ctx, Collector<String> out) throws Exception { + out.collect("INPUT2:" + value); + getRuntimeContext().getState(state).update(value); + ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5); + } + + + @Override + public void onTimer( + long timestamp, + OnTimerContext ctx, + Collector<String> out) throws Exception { + assertEquals(TimeDomain.PROCESSING_TIME, ctx.timeDomain()); + out.collect("STATE:" + getRuntimeContext().getState(state).value()); + } + } + + private static class BothTriggeringProcessFunction implements CoProcessFunction<Integer, String, String> { + + private static final long serialVersionUID = 1L; + + @Override + public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception { + ctx.timerService().registerEventTimeTimer(6); + } + + @Override + public void processElement2(String value, Context ctx, Collector<String> out) throws Exception { + ctx.timerService().registerProcessingTimeTimer(5); + } + + + @Override + public void onTimer( + long timestamp, + OnTimerContext ctx, + Collector<String> out) throws Exception { + if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) { + out.collect("EVENT:1777"); + } else { + out.collect("PROC:1777"); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java deleted file mode 100644 index 7c29631..0000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java +++ /dev/null @@ -1,536 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.api.operators.co; - - -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.streaming.api.TimeDomain; -import org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction; -import org.apache.flink.streaming.api.functions.co.TimelyCoFlatMapFunction; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; -import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness; -import org.apache.flink.streaming.util.TestHarnessUtil; -import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness; -import org.apache.flink.util.Collector; -import org.apache.flink.util.TestLogger; -import org.junit.Test; - -import java.util.concurrent.ConcurrentLinkedQueue; - -import static org.junit.Assert.assertEquals; - -/** - * Tests {@link CoStreamTimelyFlatMap}. - */ -public class TimelyCoFlatMapTest extends TestLogger { - - @Test - public void testTimestampAndWatermarkQuerying() throws Exception { - - CoStreamTimelyFlatMap<String, Integer, String, String> operator = - new CoStreamTimelyFlatMap<>(new WatermarkQueryingFlatMapFunction()); - - TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness = - new KeyedTwoInputStreamOperatorTestHarness<>( - operator, - new IntToStringKeySelector<>(), - new IdentityKeySelector<String>(), - BasicTypeInfo.STRING_TYPE_INFO); - - testHarness.setup(); - testHarness.open(); - - testHarness.processWatermark1(new Watermark(17)); - testHarness.processWatermark2(new Watermark(17)); - testHarness.processElement1(new StreamRecord<>(5, 12L)); - - testHarness.processWatermark1(new Watermark(42)); - testHarness.processWatermark2(new Watermark(42)); - testHarness.processElement2(new StreamRecord<>("6", 13L)); - - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); - - expectedOutput.add(new Watermark(17L)); - expectedOutput.add(new StreamRecord<>("5WM:17 TS:12", 12L)); - expectedOutput.add(new Watermark(42L)); - expectedOutput.add(new StreamRecord<>("6WM:42 TS:13", 13L)); - - TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); - - testHarness.close(); - } - - @Test - public void testTimestampAndProcessingTimeQuerying() throws Exception { - - CoStreamTimelyFlatMap<String, Integer, String, String> operator = - new CoStreamTimelyFlatMap<>(new ProcessingTimeQueryingFlatMapFunction()); - - TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness = - new KeyedTwoInputStreamOperatorTestHarness<>( - operator, - new IntToStringKeySelector<>(), - new IdentityKeySelector<String>(), - BasicTypeInfo.STRING_TYPE_INFO); - - testHarness.setup(); - testHarness.open(); - - testHarness.setProcessingTime(17); - testHarness.processElement1(new StreamRecord<>(5)); - - testHarness.setProcessingTime(42); - testHarness.processElement2(new StreamRecord<>("6")); - - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); - - expectedOutput.add(new StreamRecord<>("5PT:17 TS:null")); - expectedOutput.add(new StreamRecord<>("6PT:42 TS:null")); - - TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); - - testHarness.close(); - } - - @Test - public void testEventTimeTimers() throws Exception { - - CoStreamTimelyFlatMap<String, Integer, String, String> operator = - new CoStreamTimelyFlatMap<>(new EventTimeTriggeringFlatMapFunction()); - - TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness = - new KeyedTwoInputStreamOperatorTestHarness<>( - operator, - new IntToStringKeySelector<>(), - new IdentityKeySelector<String>(), - BasicTypeInfo.STRING_TYPE_INFO); - - testHarness.setup(); - testHarness.open(); - - testHarness.processElement1(new StreamRecord<>(17, 42L)); - testHarness.processElement2(new StreamRecord<>("18", 42L)); - - testHarness.processWatermark1(new Watermark(5)); - testHarness.processWatermark2(new Watermark(5)); - - testHarness.processWatermark1(new Watermark(6)); - testHarness.processWatermark2(new Watermark(6)); - - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); - - expectedOutput.add(new StreamRecord<>("INPUT1:17", 42L)); - expectedOutput.add(new StreamRecord<>("INPUT2:18", 42L)); - expectedOutput.add(new StreamRecord<>("1777", 5L)); - expectedOutput.add(new Watermark(5L)); - expectedOutput.add(new StreamRecord<>("1777", 6L)); - expectedOutput.add(new Watermark(6L)); - - TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); - - testHarness.close(); - } - - @Test - public void testProcessingTimeTimers() throws Exception { - - CoStreamTimelyFlatMap<String, Integer, String, String> operator = - new CoStreamTimelyFlatMap<>(new ProcessingTimeTriggeringFlatMapFunction()); - - TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness = - new KeyedTwoInputStreamOperatorTestHarness<>( - operator, - new IntToStringKeySelector<>(), - new IdentityKeySelector<String>(), - BasicTypeInfo.STRING_TYPE_INFO); - - testHarness.setup(); - testHarness.open(); - - testHarness.processElement1(new StreamRecord<>(17)); - testHarness.processElement2(new StreamRecord<>("18")); - - testHarness.setProcessingTime(5); - testHarness.setProcessingTime(6); - - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); - - expectedOutput.add(new StreamRecord<>("INPUT1:17")); - expectedOutput.add(new StreamRecord<>("INPUT2:18")); - expectedOutput.add(new StreamRecord<>("1777", 5L)); - expectedOutput.add(new StreamRecord<>("1777", 6L)); - - TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); - - testHarness.close(); - } - - /** - * Verifies that we don't have leakage between different keys. - */ - @Test - public void testEventTimeTimerWithState() throws Exception { - - CoStreamTimelyFlatMap<String, Integer, String, String> operator = - new CoStreamTimelyFlatMap<>(new EventTimeTriggeringStatefulFlatMapFunction()); - - TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness = - new KeyedTwoInputStreamOperatorTestHarness<>( - operator, - new IntToStringKeySelector<>(), - new IdentityKeySelector<String>(), - BasicTypeInfo.STRING_TYPE_INFO); - - testHarness.setup(); - testHarness.open(); - - testHarness.processWatermark1(new Watermark(1)); - testHarness.processWatermark2(new Watermark(1)); - testHarness.processElement1(new StreamRecord<>(17, 0L)); // should set timer for 6 - - testHarness.processWatermark1(new Watermark(2)); - testHarness.processWatermark2(new Watermark(2)); - testHarness.processElement2(new StreamRecord<>("42", 1L)); // should set timer for 7 - - testHarness.processWatermark1(new Watermark(6)); - testHarness.processWatermark2(new Watermark(6)); - - testHarness.processWatermark1(new Watermark(7)); - testHarness.processWatermark2(new Watermark(7)); - - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); - - expectedOutput.add(new Watermark(1L)); - expectedOutput.add(new StreamRecord<>("INPUT1:17", 0L)); - expectedOutput.add(new Watermark(2L)); - expectedOutput.add(new StreamRecord<>("INPUT2:42", 1L)); - expectedOutput.add(new StreamRecord<>("STATE:17", 6L)); - expectedOutput.add(new Watermark(6L)); - expectedOutput.add(new StreamRecord<>("STATE:42", 7L)); - expectedOutput.add(new Watermark(7L)); - - TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); - - testHarness.close(); - } - - /** - * Verifies that we don't have leakage between different keys. - */ - @Test - public void testProcessingTimeTimerWithState() throws Exception { - - CoStreamTimelyFlatMap<String, Integer, String, String> operator = - new CoStreamTimelyFlatMap<>(new ProcessingTimeTriggeringStatefulFlatMapFunction()); - - TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness = - new KeyedTwoInputStreamOperatorTestHarness<>( - operator, - new IntToStringKeySelector<>(), - new IdentityKeySelector<String>(), - BasicTypeInfo.STRING_TYPE_INFO); - - testHarness.setup(); - testHarness.open(); - - testHarness.setProcessingTime(1); - testHarness.processElement1(new StreamRecord<>(17)); // should set timer for 6 - - testHarness.setProcessingTime(2); - testHarness.processElement2(new StreamRecord<>("42")); // should set timer for 7 - - testHarness.setProcessingTime(6); - testHarness.setProcessingTime(7); - - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); - - expectedOutput.add(new StreamRecord<>("INPUT1:17")); - expectedOutput.add(new StreamRecord<>("INPUT2:42")); - expectedOutput.add(new StreamRecord<>("STATE:17", 6L)); - expectedOutput.add(new StreamRecord<>("STATE:42", 7L)); - - TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); - - testHarness.close(); - } - - @Test - public void testSnapshotAndRestore() throws Exception { - - CoStreamTimelyFlatMap<String, Integer, String, String> operator = - new CoStreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction()); - - TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness = - new KeyedTwoInputStreamOperatorTestHarness<>( - operator, - new IntToStringKeySelector<>(), - new IdentityKeySelector<String>(), - BasicTypeInfo.STRING_TYPE_INFO); - - testHarness.setup(); - testHarness.open(); - - testHarness.processElement1(new StreamRecord<>(5, 12L)); - testHarness.processElement2(new StreamRecord<>("5", 12L)); - - // snapshot and restore from scratch - OperatorStateHandles snapshot = testHarness.snapshot(0, 0); - - testHarness.close(); - - operator = new CoStreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction()); - - testHarness = new KeyedTwoInputStreamOperatorTestHarness<>( - operator, - new IntToStringKeySelector<>(), - new IdentityKeySelector<String>(), - BasicTypeInfo.STRING_TYPE_INFO); - - testHarness.setup(); - testHarness.initializeState(snapshot); - testHarness.open(); - - testHarness.setProcessingTime(5); - testHarness.processWatermark1(new Watermark(6)); - testHarness.processWatermark2(new Watermark(6)); - - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); - - expectedOutput.add(new StreamRecord<>("PROC:1777", 5L)); - expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L)); - expectedOutput.add(new Watermark(6)); - - TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); - - testHarness.close(); - } - - - private static class IntToStringKeySelector<T> implements KeySelector<Integer, String> { - private static final long serialVersionUID = 1L; - - @Override - public String getKey(Integer value) throws Exception { - return "" + value; - } - } - - private static class IdentityKeySelector<T> implements KeySelector<T, T> { - private static final long serialVersionUID = 1L; - - @Override - public T getKey(T value) throws Exception { - return value; - } - } - - private static class WatermarkQueryingFlatMapFunction implements TimelyCoFlatMapFunction<Integer, String, String> { - - private static final long serialVersionUID = 1L; - - @Override - public void flatMap1(Integer value, Context ctx, Collector<String> out) throws Exception { - out.collect(value + "WM:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp()); - } - - @Override - public void flatMap2(String value, Context ctx, Collector<String> out) throws Exception { - out.collect(value + "WM:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp()); - } - - @Override - public void onTimer( - long timestamp, - OnTimerContext ctx, - Collector<String> out) throws Exception { - } - } - - private static class EventTimeTriggeringFlatMapFunction implements TimelyCoFlatMapFunction<Integer, String, String> { - - private static final long serialVersionUID = 1L; - - @Override - public void flatMap1(Integer value, Context ctx, Collector<String> out) throws Exception { - out.collect("INPUT1:" + value); - ctx.timerService().registerEventTimeTimer(5); - } - - @Override - public void flatMap2(String value, Context ctx, Collector<String> out) throws Exception { - out.collect("INPUT2:" + value); - ctx.timerService().registerEventTimeTimer(6); - } - - @Override - public void onTimer( - long timestamp, - OnTimerContext ctx, - Collector<String> out) throws Exception { - - assertEquals(TimeDomain.EVENT_TIME, ctx.timeDomain()); - out.collect("" + 1777); - } - } - - private static class EventTimeTriggeringStatefulFlatMapFunction extends RichTimelyCoFlatMapFunction<Integer, String, String> { - - private static final long serialVersionUID = 1L; - - private final ValueStateDescriptor<String> state = - new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE, null); - - @Override - public void flatMap1(Integer value, Context ctx, Collector<String> out) throws Exception { - out.collect("INPUT1:" + value); - getRuntimeContext().getState(state).update("" + value); - ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5); - } - - @Override - public void flatMap2(String value, Context ctx, Collector<String> out) throws Exception { - out.collect("INPUT2:" + value); - getRuntimeContext().getState(state).update(value); - ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5); - } - - - @Override - public void onTimer( - long timestamp, - OnTimerContext ctx, - Collector<String> out) throws Exception { - assertEquals(TimeDomain.EVENT_TIME, ctx.timeDomain()); - out.collect("STATE:" + getRuntimeContext().getState(state).value()); - } - } - - private static class ProcessingTimeTriggeringFlatMapFunction implements TimelyCoFlatMapFunction<Integer, String, String> { - - private static final long serialVersionUID = 1L; - - @Override - public void flatMap1(Integer value, Context ctx, Collector<String> out) throws Exception { - out.collect("INPUT1:" + value); - ctx.timerService().registerProcessingTimeTimer(5); - } - - @Override - public void flatMap2(String value, Context ctx, Collector<String> out) throws Exception { - out.collect("INPUT2:" + value); - ctx.timerService().registerProcessingTimeTimer(6); - } - - @Override - public void onTimer( - long timestamp, - OnTimerContext ctx, - Collector<String> out) throws Exception { - - assertEquals(TimeDomain.PROCESSING_TIME, ctx.timeDomain()); - out.collect("" + 1777); - } - } - - private static class ProcessingTimeQueryingFlatMapFunction implements TimelyCoFlatMapFunction<Integer, String, String> { - - private static final long serialVersionUID = 1L; - - @Override - public void flatMap1(Integer value, Context ctx, Collector<String> out) throws Exception { - out.collect(value + "PT:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp()); - } - - @Override - public void flatMap2(String value, Context ctx, Collector<String> out) throws Exception { - out.collect(value + "PT:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp()); - } - - @Override - public void onTimer( - long timestamp, - OnTimerContext ctx, - Collector<String> out) throws Exception { - } - } - - private static class ProcessingTimeTriggeringStatefulFlatMapFunction extends RichTimelyCoFlatMapFunction<Integer, String, String> { - - private static final long serialVersionUID = 1L; - - private final ValueStateDescriptor<String> state = - new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE, null); - - @Override - public void flatMap1(Integer value, Context ctx, Collector<String> out) throws Exception { - out.collect("INPUT1:" + value); - getRuntimeContext().getState(state).update("" + value); - ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5); - } - - @Override - public void flatMap2(String value, Context ctx, Collector<String> out) throws Exception { - out.collect("INPUT2:" + value); - getRuntimeContext().getState(state).update(value); - ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5); - } - - - @Override - public void onTimer( - long timestamp, - OnTimerContext ctx, - Collector<String> out) throws Exception { - assertEquals(TimeDomain.PROCESSING_TIME, ctx.timeDomain()); - out.collect("STATE:" + getRuntimeContext().getState(state).value()); - } - } - - private static class BothTriggeringFlatMapFunction implements TimelyCoFlatMapFunction<Integer, String, String> { - - private static final long serialVersionUID = 1L; - - @Override - public void flatMap1(Integer value, Context ctx, Collector<String> out) throws Exception { - ctx.timerService().registerEventTimeTimer(6); - } - - @Override - public void flatMap2(String value, Context ctx, Collector<String> out) throws Exception { - ctx.timerService().registerProcessingTimeTimer(5); - } - - - @Override - public void onTimer( - long timestamp, - OnTimerContext ctx, - Collector<String> out) throws Exception { - if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) { - out.collect("EVENT:1777"); - } else { - out.collect("PROC:1777"); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala index 50526b5..a7325a4 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala @@ -23,7 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.functions.KeySelector import org.apache.flink.api.java.typeutils.ResultTypeQueryable import org.apache.flink.streaming.api.datastream.{ConnectedStreams => JavaCStream, DataStream => JavaStream} -import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction, TimelyCoFlatMapFunction} +import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction, CoProcessFunction, RichCoProcessFunction} import org.apache.flink.streaming.api.operators.TwoInputStreamOperator import org.apache.flink.util.Collector @@ -101,30 +101,33 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) { } /** - * Applies the given [[TimelyCoFlatMapFunction]] on the connected input streams, + * Applies the given [[CoProcessFunction]] on the connected input streams, * thereby creating a transformed output stream. * - * The function will be called for every element in the streams and can produce - * zero or more output. The function can also query the time and set timers. When - * reacting to the firing of set timers the function can emit yet more elements. + * The function will be called for every element in the input streams and can produce zero + * or more output elements. Contrary to the [[flatMap(CoFlatMapFunction)]] function, + * this function can also query the time and set timers. When reacting to the firing of set + * timers the function can directly emit elements and/or register yet more timers. * - * A [[org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction]] + * A [[RichCoProcessFunction]] * can be used to gain access to features provided by the * [[org.apache.flink.api.common.functions.RichFunction]] interface. * - * @param coFlatMapper The [[TimelyCoFlatMapFunction]] that is called for each element - * in the stream. - * - * @return The transformed { @link DataStream}. + * @param coProcessFunction The [[CoProcessFunction]] that is called for each element + * in the stream. + * @return The transformed [[DataStream]]. */ - def flatMap[R: TypeInformation]( - coFlatMapper: TimelyCoFlatMapFunction[IN1, IN2, R]) : DataStream[R] = { + @PublicEvolving + def process[R: TypeInformation]( + coProcessFunction: CoProcessFunction[IN1, IN2, R]) : DataStream[R] = { - if (coFlatMapper == null) throw new NullPointerException("FlatMap function must not be null.") + if (coProcessFunction == null) { + throw new NullPointerException("CoProcessFunction function must not be null.") + } val outType : TypeInformation[R] = implicitly[TypeInformation[R]] - asScalaStream(javaStream.flatMap(coFlatMapper, outType)) + asScalaStream(javaStream.process(coProcessFunction, outType)) } @@ -144,14 +147,14 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) { * @return * The resulting data stream. */ - def flatMap[R: TypeInformation](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]): + def flatMap[R: TypeInformation](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]): DataStream[R] = { - + if (coFlatMapper == null) { throw new NullPointerException("FlatMap function must not be null.") } - - val outType : TypeInformation[R] = implicitly[TypeInformation[R]] + + val outType : TypeInformation[R] = implicitly[TypeInformation[R]] asScalaStream(javaStream.flatMap(coFlatMapper).returns(outType).asInstanceOf[JavaStream[R]]) } http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala index 66d80c2..f2999b3 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala @@ -24,7 +24,7 @@ import org.apache.flink.api.common.state.{FoldingStateDescriptor, ListStateDescr import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.TypeSerializer import org.apache.flink.streaming.api.datastream.{QueryableStateStream, SingleOutputStreamOperator, DataStream => JavaStream, KeyedStream => KeyedJavaStream, WindowedStream => WindowedJavaStream} -import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction +import org.apache.flink.streaming.api.functions.{ProcessFunction, RichProcessFunction} import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator} import org.apache.flink.streaming.api.functions.query.{QueryableAppendingStateOperator, QueryableValueStateOperator} @@ -54,28 +54,34 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] // ------------------------------------------------------------------------ /** - * Applies the given [[TimelyFlatMapFunction]] on the input stream, thereby + * Applies the given [[ProcessFunction]] on the input stream, thereby * creating a transformed output stream. * * The function will be called for every element in the stream and can produce * zero or more output. The function can also query the time and set timers. When * reacting to the firing of set timers the function can emit yet more elements. * - * A [[org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction]] + * The function will be called for every element in the input streams and can produce zero + * or more output elements. Contrary to the [[DataStream#flatMap(FlatMapFunction)]] + * function, this function can also query the time and set timers. When reacting to the firing + * of set timers the function can directly emit elements and/or register yet more timers. + * + * A [[RichProcessFunction]] * can be used to gain access to features provided by the * [[org.apache.flink.api.common.functions.RichFunction]] * - * @param flatMapper The [[TimelyFlatMapFunction]] that is called for each element + * @param processFunction The [[ProcessFunction]] that is called for each element * in the stream. */ - def flatMap[R: TypeInformation]( - flatMapper: TimelyFlatMapFunction[T, R]): DataStream[R] = { + @PublicEvolving + def process[R: TypeInformation]( + processFunction: ProcessFunction[T, R]): DataStream[R] = { - if (flatMapper == null) { - throw new NullPointerException("TimelyFlatMapFunction must not be null.") + if (processFunction == null) { + throw new NullPointerException("ProcessFunction must not be null.") } - asScalaStream(javaStream.flatMap(flatMapper, implicitly[TypeInformation[R]])) + asScalaStream(javaStream.process(processFunction, implicitly[TypeInformation[R]])) } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala index 967142b..adb59f2 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala @@ -23,11 +23,11 @@ import java.lang import org.apache.flink.api.common.functions._ import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.streaming.api.collector.selector.OutputSelector -import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction -import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction.{Context, OnTimerContext} +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.streaming.api.functions.ProcessFunction.{Context, OnTimerContext} import org.apache.flink.streaming.api.functions.co.CoMapFunction import org.apache.flink.streaming.api.graph.{StreamEdge, StreamGraph} -import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, StreamOperator, StreamTimelyFlatMap} +import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, ProcessOperator, StreamOperator} import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, PurgingTrigger} import org.apache.flink.streaming.api.windowing.windows.GlobalWindow @@ -318,26 +318,26 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase { } /** - * Verify that a timely flat map call is correctly translated to an operator. + * Verify that a [[KeyedStream.process()]] call is correctly translated to an operator. */ @Test - def testTimelyFlatMapTranslation(): Unit = { + def testProcessTranslation(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val src = env.generateSequence(0, 0) - val timelyFlatMapFunction = new TimelyFlatMapFunction[Long, Int] { - override def flatMap(value: Long, ctx: Context, out: Collector[Int]): Unit = ??? + val processFunction = new ProcessFunction[Long, Int] { + override def processElement(value: Long, ctx: Context, out: Collector[Int]): Unit = ??? override def onTimer( timestamp: Long, ctx: OnTimerContext, out: Collector[Int]): Unit = ??? } - val flatMapped = src.keyBy(x => x).flatMap(timelyFlatMapFunction) + val flatMapped = src.keyBy(x => x).process(processFunction) - assert(timelyFlatMapFunction == getFunctionForDataStream(flatMapped)) - assert(getOperatorForDataStream(flatMapped).isInstanceOf[StreamTimelyFlatMap[_, _, _]]) + assert(processFunction == getFunctionForDataStream(flatMapped)) + assert(getOperatorForDataStream(flatMapped).isInstanceOf[ProcessOperator[_, _, _]]) } @Test def operatorTest() {
