fredia commented on code in PR #24917:
URL: https://github.com/apache/flink/pull/24917#discussion_r1666410896


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java:
##########
@@ -3130,6 +3130,113 @@ void 
testCleanupTimerWithEmptyReduceStateForSessionWindows() throws Exception {
         testHarness.close();
     }
 
+    @Test
+    void testCleanupTimerWithEmptyStateNoResultForTumblingWindows() throws 
Exception {
+        final int windowSize = 2;
+        final long lateness = 1;
+
+        ListStateDescriptor<Tuple2<String, Integer>> windowStateDesc =
+                new ListStateDescriptor<>(
+                        "window-contents",
+                        STRING_INT_TUPLE.createSerializer(new 
SerializerConfigImpl()));
+
+        WindowOperator<
+                        String,
+                        Tuple2<String, Integer>,
+                        Iterable<Tuple2<String, Integer>>,
+                        Tuple2<String, Integer>,
+                        TimeWindow>
+                operator =
+                        new WindowOperator<>(
+                                
TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
+                                new TimeWindow.Serializer(),
+                                new TupleKeySelector(),
+                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(
+                                        new SerializerConfigImpl()),
+                                windowStateDesc,
+                                new InternalIterableWindowFunction<>(new 
EmptyReturnFunction()),
+                                new 
FireEverytimeOnElementAndEventTimeTrigger(),
+                                lateness,
+                                null /* late data output tag */);
+
+        OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>>
+                testHarness = createTestHarness(operator);
+
+        testHarness.open();
+
+        ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
+        // normal element
+        testHarness.processElement(new StreamRecord<>(new Tuple2<>("test_key", 
1), 1000));
+        assertThat(
+                        operator.processContext
+                                .windowState()
+                                .getListState(windowStateDesc)
+                                .get()
+                                .toString())
+                .isEqualTo("[(test_key,1)]");
+        testHarness.processWatermark(new Watermark(1599));
+        assertThat(
+                        operator.processContext
+                                .windowState()
+                                .getListState(windowStateDesc)
+                                .get()
+                                .toString())
+                .isEqualTo("[(test_key,1)]");
+        testHarness.processWatermark(new Watermark(1699));
+        assertThat(
+                        operator.processContext
+                                .windowState()
+                                .getListState(windowStateDesc)
+                                .get()
+                                .toString())
+                .isEqualTo("[(test_key,1)]");
+        testHarness.processWatermark(new Watermark(1799));
+        assertThat(
+                        operator.processContext
+                                .windowState()
+                                .getListState(windowStateDesc)
+                                .get()
+                                .toString())
+                .isEqualTo("[(test_key,1)]");
+        testHarness.processWatermark(new Watermark(1999));
+        assertThat(
+                        operator.processContext
+                                .windowState()
+                                .getListState(windowStateDesc)
+                                .get()
+                                .toString())
+                .isEqualTo("[(test_key,1)]");
+        testHarness.processWatermark(new Watermark(2000));
+        assertThat(
+                        operator.processContext
+                                .windowState()
+                                .getListState(windowStateDesc)
+                                .get()
+                                .toString())
+                .isEqualTo("[]");
+        testHarness.processWatermark(new Watermark(5000));
+        assertThat(
+                        operator.processContext
+                                .windowState()
+                                .getListState(windowStateDesc)
+                                .get()
+                                .toString())
+                .isEqualTo("[]");
+
+        expected.add(new Watermark(1599));
+        expected.add(new Watermark(1699));
+        expected.add(new Watermark(1799));
+        expected.add(new Watermark(1999)); // here it fires and purges
+        expected.add(new Watermark(2000)); // here is the cleanup timer
+        expected.add(new Watermark(5000));
+
+        System.out.println(expected);

Review Comment:
   nit: remove ` System.out.println()`



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java:
##########
@@ -3130,6 +3130,113 @@ void 
testCleanupTimerWithEmptyReduceStateForSessionWindows() throws Exception {
         testHarness.close();
     }
 
+    @Test
+    void testCleanupTimerWithEmptyStateNoResultForTumblingWindows() throws 
Exception {
+        final int windowSize = 2;
+        final long lateness = 1;
+
+        ListStateDescriptor<Tuple2<String, Integer>> windowStateDesc =
+                new ListStateDescriptor<>(
+                        "window-contents",
+                        STRING_INT_TUPLE.createSerializer(new 
SerializerConfigImpl()));
+
+        WindowOperator<
+                        String,
+                        Tuple2<String, Integer>,
+                        Iterable<Tuple2<String, Integer>>,
+                        Tuple2<String, Integer>,
+                        TimeWindow>
+                operator =
+                        new WindowOperator<>(
+                                
TumblingEventTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)),
+                                new TimeWindow.Serializer(),
+                                new TupleKeySelector(),
+                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(
+                                        new SerializerConfigImpl()),
+                                windowStateDesc,
+                                new InternalIterableWindowFunction<>(new 
EmptyReturnFunction()),
+                                new 
FireEverytimeOnElementAndEventTimeTrigger(),
+                                lateness,
+                                null /* late data output tag */);
+
+        OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>>
+                testHarness = createTestHarness(operator);
+
+        testHarness.open();
+
+        ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
+        // normal element
+        testHarness.processElement(new StreamRecord<>(new Tuple2<>("test_key", 
1), 1000));
+        assertThat(
+                        operator.processContext
+                                .windowState()
+                                .getListState(windowStateDesc)
+                                .get()
+                                .toString())
+                .isEqualTo("[(test_key,1)]");
+        testHarness.processWatermark(new Watermark(1599));
+        assertThat(
+                        operator.processContext
+                                .windowState()
+                                .getListState(windowStateDesc)
+                                .get()
+                                .toString())
+                .isEqualTo("[(test_key,1)]");
+        testHarness.processWatermark(new Watermark(1699));
+        assertThat(
+                        operator.processContext
+                                .windowState()
+                                .getListState(windowStateDesc)
+                                .get()
+                                .toString())
+                .isEqualTo("[(test_key,1)]");
+        testHarness.processWatermark(new Watermark(1799));
+        assertThat(
+                        operator.processContext
+                                .windowState()
+                                .getListState(windowStateDesc)
+                                .get()
+                                .toString())
+                .isEqualTo("[(test_key,1)]");
+        testHarness.processWatermark(new Watermark(1999));
+        assertThat(
+                        operator.processContext
+                                .windowState()
+                                .getListState(windowStateDesc)
+                                .get()
+                                .toString())
+                .isEqualTo("[(test_key,1)]");
+        testHarness.processWatermark(new Watermark(2000));
+        assertThat(
+                        operator.processContext
+                                .windowState()
+                                .getListState(windowStateDesc)
+                                .get()
+                                .toString())
+                .isEqualTo("[]");
+        testHarness.processWatermark(new Watermark(5000));
+        assertThat(
+                        operator.processContext
+                                .windowState()
+                                .getListState(windowStateDesc)
+                                .get()
+                                .toString())
+                .isEqualTo("[]");
+
+        expected.add(new Watermark(1599));
+        expected.add(new Watermark(1699));
+        expected.add(new Watermark(1799));
+        expected.add(new Watermark(1999)); // here it fires and purges
+        expected.add(new Watermark(2000)); // here is the cleanup timer
+        expected.add(new Watermark(5000));
+
+        System.out.println(expected);
+        System.out.println(testHarness.getOutput());
+        TestHarnessUtil.assertOutputEqualsSorted(
+                "Output was not correct.", expected, expected, new 
Tuple2ResultSortComparator());

Review Comment:
   ```suggestion
                   "Output was not correct.", expected, 
testHarness.getOutput(), new Tuple2ResultSortComparator());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to