[
https://issues.apache.org/jira/browse/FLINK-3714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333725#comment-15333725
]
ASF GitHub Bot commented on FLINK-3714:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/2093#discussion_r67339362
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
---
@@ -917,6 +929,465 @@ public void testRestoreAndSnapshotAreInSync() throws
Exception {
Assert.assertEquals(operator.processingTimeTimerTimestamps,
otherOperator.processingTimeTimerTimestamps);
}
+ @Test
+ public void testLateness() throws Exception {
+ final int WINDOW_SIZE = 2;
+ final long LATENESS = 500;
+
+ TypeInformation<Tuple2<String, Integer>> inputType =
TypeInfoParser.parse("Tuple2<String, Integer>");
+
+ ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc =
new ReducingStateDescriptor<>("window-contents",
+ new SumReducer(),
+ inputType.createSerializer(new ExecutionConfig()));
+
+ WindowOperator<String, Tuple2<String, Integer>, Tuple2<String,
Integer>, Tuple2<String, Integer>, TimeWindow> operator =
+ new WindowOperator<>(
+
TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+ new TimeWindow.Serializer(),
+ new TupleKeySelector(),
+
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+ stateDesc,
+ new InternalSingleValueWindowFunction<>(new
PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
+ EventTimeTrigger.create(),
+ LATENESS);
+
+ OneInputStreamOperatorTestHarness<Tuple2<String, Integer>,
Tuple2<String, Integer>> testHarness =
+ new OneInputStreamOperatorTestHarness<>(operator);
+
+ testHarness.configureForKeyedStream(new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO);
+
+ operator.setInputType(inputType, new ExecutionConfig());
+ testHarness.open();
+
+ long initialTime = 0L;
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 500));
+ testHarness.processWatermark(new Watermark(initialTime + 1500));
+
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 1300));
+
+ testHarness.processWatermark(new Watermark(initialTime + 2300));
+
+ // this will not be dropped because window.maxTimestamp() +
allowedLateness > currentWatermark
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 1997));
+ testHarness.processWatermark(new Watermark(initialTime + 6000));
+
+ // this will be dropped because window.maxTimestamp() +
allowedLateness < currentWatermark
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 1998));
+ testHarness.processWatermark(new Watermark(initialTime + 7000));
+
+ Tuple2<String, Integer> el1 = new Tuple2<>("key2", 2);
+ // the following is 1 and not 3because the trigger fires and
purges.
+ Tuple2<String, Integer> el2 = new Tuple2<>("key2", 1);
+
+ ConcurrentLinkedQueue<Object> expected = new
ConcurrentLinkedQueue<>();
+
+ expected.add(new Watermark(initialTime + 1500));
+ expected.add(new StreamRecord<>(el1, initialTime + 1999));
+
+ expected.add(new Watermark(initialTime + 2300));
+ expected.add(new StreamRecord<>(el2, initialTime + 1999));
+
+ expected.add(new Watermark(initialTime + 6000));
+ expected.add(new Watermark(initialTime + 7000));
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not
correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
+ testHarness.close();
+ }
+
+ @Test
+ public void testDropDueToLatenessTumbling() throws Exception {
--- End diff --
No element is dropped in this test. If I'm not mistaken.
> Add Support for "Allowed Lateness"
> ----------------------------------
>
> Key: FLINK-3714
> URL: https://issues.apache.org/jira/browse/FLINK-3714
> Project: Flink
> Issue Type: Sub-task
> Components: Streaming
> Reporter: Aljoscha Krettek
> Assignee: Kostas Kloudas
>
> As mentioned in
> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit#
> we should add support for an allowed lateness setting.
> This includes several things:
> - API for setting allowed lateness
> - Dropping of late elements
> - Garbage collection of windows state/timers
> Depending on whether the {{WindowAssigner}} assigns windows based on event
> time or processing time we have to adjust the GC behavior. For event-time
> windows "allowed lateness" makes sense and we should garbage collect after
> this expires. For processing-time windows "allowed lateness" does not make
> sense and we should always GC window state/timers at the end timestamp of a
> processing-time window. I think that we need a method for this on
> {{WindowAssigner}} that allows to differentiate between event-time windows
> and processing-time windows: {{boolean WindowAssigner.isEventTime()}}.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)