spuru9 commented on code in PR #28530:
URL: https://github.com/apache/flink/pull/28530#discussion_r3476380286


##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java:
##########
@@ -494,6 +506,77 @@ void testEventTimeCumulativeWindows() throws Exception {
         testHarness.close();
     }
 
+    @TestTemplate
+    void 
testGlobalEventTimeCumulativeWindowsDoNotRefireExpiredWindowAfterRestore()
+            throws Exception {
+        final SliceAssigner assigner =
+                SliceAssigners.cumulative(
+                        3, shiftTimeZone, Duration.ofSeconds(3), 
Duration.ofSeconds(1));
+        final SlicingSumAndCountAggsFunction globalAggsFunction =
+                new SlicingSumAndCountAggsFunction(assigner);
+        final SlicingSumAndCountAggsFunction stateAggsFunction =
+                new SlicingSumAndCountAggsFunction(assigner);
+        OneInputStreamOperator<RowData, RowData> operator =
+                buildGlobalWindowOperator(
+                        assigner,
+                        LOCAL_ACC_INPUT_ROW_SER,
+                        new LocalAccumulatorRowsAggsFunction(assigner),
+                        globalAggsFunction,
+                        stateAggsFunction,
+                        null);
+
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                createTestHarness(operator);
+
+        testHarness.setup(OUT_SERIALIZER);
+        testHarness.open();
+
+        ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+        testHarness.processElement(insertRecord("key1", 1L, 1L, 
fromEpochMillis(20L)));
+        testHarness.processElement(insertRecord("key1", 1L, 1L, 
fromEpochMillis(0L)));
+        testHarness.processElement(insertRecord("key1", 1L, 1L, 
fromEpochMillis(999L)));
+
+        testHarness.processElement(insertRecord("key2", 1L, 1L, 
fromEpochMillis(1998L)));
+        testHarness.processElement(insertRecord("key2", 1L, 1L, 
fromEpochMillis(1999L)));
+        testHarness.processElement(insertRecord("key2", 1L, 1L, 
fromEpochMillis(1000L)));
+
+        testHarness.processWatermark(new Watermark(999));
+        expectedOutput.add(insertRecord("key1", 3L, 3L, localMills(0L), 
localMills(1000L)));
+        expectedOutput.add(new Watermark(999));
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.processWatermark(new Watermark(1999));
+        expectedOutput.add(insertRecord("key1", 3L, 3L, localMills(0L), 
localMills(2000L)));
+        expectedOutput.add(insertRecord("key2", 3L, 3L, localMills(0L), 
localMills(2000L)));
+        expectedOutput.add(new Watermark(1999));
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.prepareSnapshotPreBarrier(0L);
+        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0);
+        testHarness.close();

Review Comment:
   nit: (Optional) Unlike all other restore/close tests in this file this one 
never asserts globalAggsFunction.closeCalled / stateAggsFunction.closeCalled 
after close().



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to