Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/2093#discussion_r67339362
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
---
@@ -917,6 +929,465 @@ public void testRestoreAndSnapshotAreInSync() throws
Exception {
Assert.assertEquals(operator.processingTimeTimerTimestamps,
otherOperator.processingTimeTimerTimestamps);
}
+ @Test
+ public void testLateness() throws Exception {
+ final int WINDOW_SIZE = 2;
+ final long LATENESS = 500;
+
+ TypeInformation<Tuple2<String, Integer>> inputType =
TypeInfoParser.parse("Tuple2<String, Integer>");
+
+ ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc =
new ReducingStateDescriptor<>("window-contents",
+ new SumReducer(),
+ inputType.createSerializer(new ExecutionConfig()));
+
+ WindowOperator<String, Tuple2<String, Integer>, Tuple2<String,
Integer>, Tuple2<String, Integer>, TimeWindow> operator =
+ new WindowOperator<>(
+
TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
+ new TimeWindow.Serializer(),
+ new TupleKeySelector(),
+
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+ stateDesc,
+ new InternalSingleValueWindowFunction<>(new
PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
+ EventTimeTrigger.create(),
+ LATENESS);
+
+ OneInputStreamOperatorTestHarness<Tuple2<String, Integer>,
Tuple2<String, Integer>> testHarness =
+ new OneInputStreamOperatorTestHarness<>(operator);
+
+ testHarness.configureForKeyedStream(new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO);
+
+ operator.setInputType(inputType, new ExecutionConfig());
+ testHarness.open();
+
+ long initialTime = 0L;
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 500));
+ testHarness.processWatermark(new Watermark(initialTime + 1500));
+
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 1300));
+
+ testHarness.processWatermark(new Watermark(initialTime + 2300));
+
+ // this will not be dropped because window.maxTimestamp() +
allowedLateness > currentWatermark
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 1997));
+ testHarness.processWatermark(new Watermark(initialTime + 6000));
+
+ // this will be dropped because window.maxTimestamp() +
allowedLateness < currentWatermark
+ testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), initialTime + 1998));
+ testHarness.processWatermark(new Watermark(initialTime + 7000));
+
+ Tuple2<String, Integer> el1 = new Tuple2<>("key2", 2);
+ // the following is 1 and not 3because the trigger fires and
purges.
+ Tuple2<String, Integer> el2 = new Tuple2<>("key2", 1);
+
+ ConcurrentLinkedQueue<Object> expected = new
ConcurrentLinkedQueue<>();
+
+ expected.add(new Watermark(initialTime + 1500));
+ expected.add(new StreamRecord<>(el1, initialTime + 1999));
+
+ expected.add(new Watermark(initialTime + 2300));
+ expected.add(new StreamRecord<>(el2, initialTime + 1999));
+
+ expected.add(new Watermark(initialTime + 6000));
+ expected.add(new Watermark(initialTime + 7000));
+
+ TestHarnessUtil.assertOutputEqualsSorted("Output was not
correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
+ testHarness.close();
+ }
+
+ @Test
+ public void testDropDueToLatenessTumbling() throws Exception {
--- End diff --
No element is dropped in this test. If I'm not mistaken.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---