This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a7e15262cdf33d9587b2cb898ca7384fa5903824 Author: liliwei <[email protected]> AuthorDate: Thu Sep 2 03:16:54 2021 +0800 [FLINK-20443][API/DataStream] ContinuousProcessingTimeTrigger doesn't fire at the end of the window --- .../triggers/ContinuousProcessingTimeTrigger.java | 30 ++++---- .../ContinuousProcessingTimeTriggerTest.java | 82 +++++++++++++++++++++- 2 files changed, 97 insertions(+), 15 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java index c437ba7..e3b1325 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java @@ -50,18 +50,13 @@ public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<O @Override public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception { - ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc); + ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc); timestamp = ctx.getCurrentProcessingTime(); - if (fireTimestamp.get() == null) { - long start = timestamp - (timestamp % interval); - long nextFireTimestamp = start + interval; - - ctx.registerProcessingTimeTimer(nextFireTimestamp); - - fireTimestamp.add(nextFireTimestamp); - return TriggerResult.CONTINUE; + if (fireTimestampState.get() == null) { + registerNextFireTimestamp( + timestamp - (timestamp % interval), window, ctx, fireTimestampState); } return TriggerResult.CONTINUE; } @@ -74,12 +69,11 @@ public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<O @Override public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception { - ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc); + ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc); - if (fireTimestamp.get().equals(time)) { - fireTimestamp.clear(); - fireTimestamp.add(time + interval); - ctx.registerProcessingTimeTimer(time + interval); + if (fireTimestampState.get().equals(time)) { + fireTimestampState.clear(); + registerNextFireTimestamp(time, window, ctx, fireTimestampState); return TriggerResult.FIRE; } return TriggerResult.CONTINUE; @@ -141,4 +135,12 @@ public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<O return Math.min(value1, value2); } } + + private void registerNextFireTimestamp( + long time, W window, TriggerContext ctx, ReducingState<Long> fireTimestampState) + throws Exception { + long nextFireTimestamp = Math.min(time + interval, window.maxTimestamp()); + fireTimestampState.add(nextFireTimestamp); + ctx.registerProcessingTimeTimer(nextFireTimestamp); + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousProcessingTimeTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousProcessingTimeTriggerTest.java index a50ee92..e0b58ea 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousProcessingTimeTriggerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ContinuousProcessingTimeTriggerTest.java @@ -25,6 +25,7 @@ import org.apache.flink.api.java.functions.NullByteKeySelector; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; @@ -93,6 +94,84 @@ public class ContinuousProcessingTimeTriggerTest { } } + /** Verify ContinuousProcessingTimeTrigger fire. */ + @Test + public void testWindowFiring() throws Exception { + ContinuousProcessingTimeTrigger<TimeWindow> trigger = + ContinuousProcessingTimeTrigger.of(Time.milliseconds(5)); + + assertTrue(trigger.canMerge()); + + ListStateDescriptor<Integer> stateDesc = + new ListStateDescriptor<>( + "window-contents", + BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig())); + + WindowOperator<Byte, Integer, Iterable<Integer>, WindowedInteger, TimeWindow> operator = + new WindowOperator<>( + TumblingProcessingTimeWindows.of(Time.milliseconds(10)), + new TimeWindow.Serializer(), + new NullByteKeySelector<>(), + BasicTypeInfo.BYTE_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalIterableWindowFunction<>(new IntegerSumWindowFunction()), + trigger, + 0, + null); + + KeyedOneInputStreamOperatorTestHarness<Byte, Integer, WindowedInteger> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>( + operator, operator.getKeySelector(), BasicTypeInfo.BYTE_TYPE_INFO); + + ArrayDeque<Object> expectedOutput = new ArrayDeque<>(); + + testHarness.open(); + + // window [0, 10) + testHarness.getProcessingTimeService().setCurrentTime(0); + testHarness.processElement(1, NO_TIMESTAMP); + + // window [0, 10) + testHarness.getProcessingTimeService().setCurrentTime(2); + testHarness.processElement(2, NO_TIMESTAMP); + + // Fire window [0, 10), value is 1+2=3. + testHarness.getProcessingTimeService().setCurrentTime(5); + expectedOutput.add(new StreamRecord<>(new WindowedInteger(new TimeWindow(0, 10), 3), 9)); + TestHarnessUtil.assertOutputEquals( + "Output mismatch", expectedOutput, testHarness.getOutput()); + + // window [0, 10) + testHarness.getProcessingTimeService().setCurrentTime(7); + testHarness.processElement(3, NO_TIMESTAMP); + + // Fire window [0, 10), value is 3+3=6. + testHarness.getProcessingTimeService().setCurrentTime(9); + expectedOutput.add(new StreamRecord<>(new WindowedInteger(new TimeWindow(0, 10), 6), 9)); + TestHarnessUtil.assertOutputEquals( + "Output mismatch", expectedOutput, testHarness.getOutput()); + + // window [10, 20) + testHarness.getProcessingTimeService().setCurrentTime(10); + testHarness.processElement(3, NO_TIMESTAMP); + + // Fire window [10, 20), value is 3. + testHarness.getProcessingTimeService().setCurrentTime(15); + expectedOutput.add(new StreamRecord<>(new WindowedInteger(new TimeWindow(10, 20), 3), 19)); + TestHarnessUtil.assertOutputEquals( + "Output mismatch", expectedOutput, testHarness.getOutput()); + + // window [10, 20) + testHarness.getProcessingTimeService().setCurrentTime(18); + testHarness.processElement(3, NO_TIMESTAMP); + + // Fire window [10, 20), value is 3+3=6. + testHarness.getProcessingTimeService().setCurrentTime(20); + expectedOutput.add(new StreamRecord<>(new WindowedInteger(new TimeWindow(10, 20), 6), 19)); + TestHarnessUtil.assertOutputEquals( + "Output mismatch", expectedOutput, testHarness.getOutput()); + } + @Test public void testMergingWindows() throws Exception { ContinuousProcessingTimeTrigger<TimeWindow> trigger = @@ -149,8 +228,9 @@ public class ContinuousProcessingTimeTriggerTest { TestHarnessUtil.assertOutputEquals( "Output mismatch", expectedOutput, testHarness.getOutput()); - // There is no on time firing for now. + // Firing on time. testHarness.getProcessingTimeService().setCurrentTime(15); + expectedOutput.add(new StreamRecord<>(new WindowedInteger(new TimeWindow(0, 12), 3), 11)); TestHarnessUtil.assertOutputEquals( "Output mismatch", expectedOutput, testHarness.getOutput());
