fredia commented on code in PR #24917:
URL: https://github.com/apache/flink/pull/24917#discussion_r1666410896
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java:
##########
@@ -3130,6 +3130,113 @@ void
testCleanupTimerWithEmptyReduceStateForSessionWindows() throws Exception {
testHarness.close();
}
+ @Test
+ void testCleanupTimerWithEmptyStateNoResultForTumblingWindows() throws
Exception {
+ final int windowSize = 2;
+ final long lateness = 1;
+
+ ListStateDescriptor<Tuple2<String, Integer>> windowStateDesc =
+ new ListStateDescriptor<>(
+ "window-contents",
+ STRING_INT_TUPLE.createSerializer(new
SerializerConfigImpl()));
+
+ WindowOperator<
+ String,
+ Tuple2<String, Integer>,
+ Iterable<Tuple2<String, Integer>>,
+ Tuple2<String, Integer>,
+ TimeWindow>
+ operator =
+ new WindowOperator<>(
+
TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
+ new TimeWindow.Serializer(),
+ new TupleKeySelector(),
+
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(
+ new SerializerConfigImpl()),
+ windowStateDesc,
+ new InternalIterableWindowFunction<>(new
EmptyReturnFunction()),
+ new
FireEverytimeOnElementAndEventTimeTrigger(),
+ lateness,
+ null /* late data output tag */);
+
+ OneInputStreamOperatorTestHarness<Tuple2<String, Integer>,
Tuple2<String, Integer>>
+ testHarness = createTestHarness(operator);
+
+ testHarness.open();
+
+ ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
+ // normal element
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("test_key",
1), 1000));
+ assertThat(
+ operator.processContext
+ .windowState()
+ .getListState(windowStateDesc)
+ .get()
+ .toString())
+ .isEqualTo("[(test_key,1)]");
+ testHarness.processWatermark(new Watermark(1599));
+ assertThat(
+ operator.processContext
+ .windowState()
+ .getListState(windowStateDesc)
+ .get()
+ .toString())
+ .isEqualTo("[(test_key,1)]");
+ testHarness.processWatermark(new Watermark(1699));
+ assertThat(
+ operator.processContext
+ .windowState()
+ .getListState(windowStateDesc)
+ .get()
+ .toString())
+ .isEqualTo("[(test_key,1)]");
+ testHarness.processWatermark(new Watermark(1799));
+ assertThat(
+ operator.processContext
+ .windowState()
+ .getListState(windowStateDesc)
+ .get()
+ .toString())
+ .isEqualTo("[(test_key,1)]");
+ testHarness.processWatermark(new Watermark(1999));
+ assertThat(
+ operator.processContext
+ .windowState()
+ .getListState(windowStateDesc)
+ .get()
+ .toString())
+ .isEqualTo("[(test_key,1)]");
+ testHarness.processWatermark(new Watermark(2000));
+ assertThat(
+ operator.processContext
+ .windowState()
+ .getListState(windowStateDesc)
+ .get()
+ .toString())
+ .isEqualTo("[]");
+ testHarness.processWatermark(new Watermark(5000));
+ assertThat(
+ operator.processContext
+ .windowState()
+ .getListState(windowStateDesc)
+ .get()
+ .toString())
+ .isEqualTo("[]");
+
+ expected.add(new Watermark(1599));
+ expected.add(new Watermark(1699));
+ expected.add(new Watermark(1799));
+ expected.add(new Watermark(1999)); // here it fires and purges
+ expected.add(new Watermark(2000)); // here is the cleanup timer
+ expected.add(new Watermark(5000));
+
+ System.out.println(expected);
Review Comment:
nit: remove ` System.out.println()`
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java:
##########
@@ -3130,6 +3130,113 @@ void
testCleanupTimerWithEmptyReduceStateForSessionWindows() throws Exception {
testHarness.close();
}
+ @Test
+ void testCleanupTimerWithEmptyStateNoResultForTumblingWindows() throws
Exception {
+ final int windowSize = 2;
+ final long lateness = 1;
+
+ ListStateDescriptor<Tuple2<String, Integer>> windowStateDesc =
+ new ListStateDescriptor<>(
+ "window-contents",
+ STRING_INT_TUPLE.createSerializer(new
SerializerConfigImpl()));
+
+ WindowOperator<
+ String,
+ Tuple2<String, Integer>,
+ Iterable<Tuple2<String, Integer>>,
+ Tuple2<String, Integer>,
+ TimeWindow>
+ operator =
+ new WindowOperator<>(
+
TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
+ new TimeWindow.Serializer(),
+ new TupleKeySelector(),
+
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(
+ new SerializerConfigImpl()),
+ windowStateDesc,
+ new InternalIterableWindowFunction<>(new
EmptyReturnFunction()),
+ new
FireEverytimeOnElementAndEventTimeTrigger(),
+ lateness,
+ null /* late data output tag */);
+
+ OneInputStreamOperatorTestHarness<Tuple2<String, Integer>,
Tuple2<String, Integer>>
+ testHarness = createTestHarness(operator);
+
+ testHarness.open();
+
+ ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
+ // normal element
+ testHarness.processElement(new StreamRecord<>(new Tuple2<>("test_key",
1), 1000));
+ assertThat(
+ operator.processContext
+ .windowState()
+ .getListState(windowStateDesc)
+ .get()
+ .toString())
+ .isEqualTo("[(test_key,1)]");
+ testHarness.processWatermark(new Watermark(1599));
+ assertThat(
+ operator.processContext
+ .windowState()
+ .getListState(windowStateDesc)
+ .get()
+ .toString())
+ .isEqualTo("[(test_key,1)]");
+ testHarness.processWatermark(new Watermark(1699));
+ assertThat(
+ operator.processContext
+ .windowState()
+ .getListState(windowStateDesc)
+ .get()
+ .toString())
+ .isEqualTo("[(test_key,1)]");
+ testHarness.processWatermark(new Watermark(1799));
+ assertThat(
+ operator.processContext
+ .windowState()
+ .getListState(windowStateDesc)
+ .get()
+ .toString())
+ .isEqualTo("[(test_key,1)]");
+ testHarness.processWatermark(new Watermark(1999));
+ assertThat(
+ operator.processContext
+ .windowState()
+ .getListState(windowStateDesc)
+ .get()
+ .toString())
+ .isEqualTo("[(test_key,1)]");
+ testHarness.processWatermark(new Watermark(2000));
+ assertThat(
+ operator.processContext
+ .windowState()
+ .getListState(windowStateDesc)
+ .get()
+ .toString())
+ .isEqualTo("[]");
+ testHarness.processWatermark(new Watermark(5000));
+ assertThat(
+ operator.processContext
+ .windowState()
+ .getListState(windowStateDesc)
+ .get()
+ .toString())
+ .isEqualTo("[]");
+
+ expected.add(new Watermark(1599));
+ expected.add(new Watermark(1699));
+ expected.add(new Watermark(1799));
+ expected.add(new Watermark(1999)); // here it fires and purges
+ expected.add(new Watermark(2000)); // here is the cleanup timer
+ expected.add(new Watermark(5000));
+
+ System.out.println(expected);
+ System.out.println(testHarness.getOutput());
+ TestHarnessUtil.assertOutputEqualsSorted(
+ "Output was not correct.", expected, expected, new
Tuple2ResultSortComparator());
Review Comment:
```suggestion
"Output was not correct.", expected,
testHarness.getOutput(), new Tuple2ResultSortComparator());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]