[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15588193#comment-15588193 ]
ASF GitHub Bot commented on FLINK-3674: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2570#discussion_r84031809 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java --- @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + + +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction; +import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.Collector; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.junit.Assert.assertEquals; + +/** + * Tests {@link StreamTimelyFlatMap}. + */ +public class TimelyFlatMapTest extends TestLogger { + + @Test + public void testCurrentEventTime() throws Exception { + + StreamTimelyFlatMap<Integer, Integer, String> operator = + new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new WatermarkQueryingFlatMapFunction()); + + OneInputStreamOperatorTestHarness<Integer, String> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processWatermark(new Watermark(17)); + testHarness.processElement(new StreamRecord<>(5, 12L)); + + testHarness.processWatermark(new Watermark(42)); + testHarness.processElement(new StreamRecord<>(6, 13L)); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new Watermark(17L)); + expectedOutput.add(new StreamRecord<>("5WM:17", 12L)); + expectedOutput.add(new Watermark(42L)); + expectedOutput.add(new StreamRecord<>("6WM:42", 13L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + public void testCurrentProcessingTime() throws Exception { + + StreamTimelyFlatMap<Integer, Integer, String> operator = + new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new ProcessingTimeQueryingFlatMapFunction()); + + OneInputStreamOperatorTestHarness<Integer, String> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(17); + testHarness.processElement(new StreamRecord<>(5)); + + testHarness.setProcessingTime(42); + testHarness.processElement(new StreamRecord<>(6)); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("5PT:17")); + expectedOutput.add(new StreamRecord<>("6PT:42")); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + public void testEventTimeTimers() throws Exception { + + StreamTimelyFlatMap<Integer, Integer, Integer> operator = + new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new EventTimeTriggeringFlatMapFunction()); + + OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(17, 42L)); + + testHarness.processWatermark(new Watermark(5)); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>(17, 42L)); + expectedOutput.add(new StreamRecord<>(1777, 5L)); + expectedOutput.add(new Watermark(5L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + public void testProcessingTimeTimers() throws Exception { + + StreamTimelyFlatMap<Integer, Integer, Integer> operator = + new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new ProcessingTimeTriggeringFlatMapFunction()); + + OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(17)); + + testHarness.setProcessingTime(5); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>(17)); + expectedOutput.add(new StreamRecord<>(1777, 5L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + /** + * Verifies that we don't have leakage between different keys. + */ + @Test + public void testEventTimeTimerWithState() throws Exception { + + StreamTimelyFlatMap<Integer, Integer, String> operator = + new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new EventTimeTriggeringStatefulFlatMapFunction()); + + OneInputStreamOperatorTestHarness<Integer, String> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processWatermark(new Watermark(1)); + testHarness.processElement(new StreamRecord<>(17, 0L)); // should set timer for 6 + + testHarness.processWatermark(new Watermark(2)); + testHarness.processElement(new StreamRecord<>(42, 1L)); // should set timer for 7 + + testHarness.processWatermark(new Watermark(6)); + testHarness.processWatermark(new Watermark(7)); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new Watermark(1L)); + expectedOutput.add(new StreamRecord<>("INPUT:17", 0L)); + expectedOutput.add(new Watermark(2L)); + expectedOutput.add(new StreamRecord<>("INPUT:42", 1L)); + expectedOutput.add(new StreamRecord<>("STATE:17", 6L)); + expectedOutput.add(new Watermark(6L)); + expectedOutput.add(new StreamRecord<>("STATE:42", 7L)); + expectedOutput.add(new Watermark(7L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + /** + * Verifies that we don't have leakage between different keys. + */ + @Test + public void testProcessingTimeTimerWithState() throws Exception { + + StreamTimelyFlatMap<Integer, Integer, String> operator = + new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new ProcessingTimeTriggeringStatefulFlatMapFunction()); + + OneInputStreamOperatorTestHarness<Integer, String> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(1); + testHarness.processElement(new StreamRecord<>(17)); // should set timer for 6 + + testHarness.setProcessingTime(2); + testHarness.processElement(new StreamRecord<>(42)); // should set timer for 7 + + testHarness.setProcessingTime(6); + testHarness.setProcessingTime(7); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("INPUT:17")); + expectedOutput.add(new StreamRecord<>("INPUT:42")); + expectedOutput.add(new StreamRecord<>("STATE:17", 6L)); + expectedOutput.add(new StreamRecord<>("STATE:42", 7L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + public void testSnapshotAndRestore() throws Exception { + + StreamTimelyFlatMap<Integer, Integer, String> operator = + new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new BothTriggeringFlatMapFunction()); + + OneInputStreamOperatorTestHarness<Integer, String> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(5, 12L)); + + // snapshot and restore from scratch + StreamStateHandle snapshot = testHarness.snapshot(0, 0); + + testHarness.close(); + + operator = new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new BothTriggeringFlatMapFunction()); + + testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.restore(snapshot); + testHarness.open(); + + testHarness.setProcessingTime(5); + testHarness.processWatermark(new Watermark(6)); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("PROC:1777", 5L)); + expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L)); + expectedOutput.add(new Watermark(6)); + + System.out.println("GOT: " + testHarness.getOutput()); --- End diff -- done. > Add an interface for Time aware User Functions > ---------------------------------------------- > > Key: FLINK-3674 > URL: https://issues.apache.org/jira/browse/FLINK-3674 > Project: Flink > Issue Type: New Feature > Components: Streaming > Affects Versions: 1.0.0 > Reporter: Stephan Ewen > Assignee: Aljoscha Krettek > > I suggest to add an interface that UDFs can implement, which will let them be > notified upon watermark updates. > Example usage: > {code} > public interface EventTimeFunction { > void onWatermark(Watermark watermark); > } > public class MyMapper implements MapFunction<String, String>, > EventTimeFunction { > private long currentEventTime = Long.MIN_VALUE; > public String map(String value) { > return value + " @ " + currentEventTime; > } > public void onWatermark(Watermark watermark) { > currentEventTime = watermark.getTimestamp(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)