Au-Miner commented on code in PR #26708: URL: https://github.com/apache/flink/pull/26708#discussion_r2160656829
########## flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/lookup/TableKeyedAsyncWaitOperatorTest.java: ########## @@ -349,23 +350,20 @@ public void testSnapshotAndRestore() throws Exception { testHarness.processElement(new StreamRecord<>(6, initialTime + 6)); testHarness.processElement(new StreamRecord<>(7, initialTime + 7)); testHarness.processElement(new StreamRecord<>(8, initialTime + 8)); + testHarness.processWatermark(Long.MAX_VALUE); Review Comment: Maybe we can move `processWatermark` between `initializeState()` and `StreamRecord(5, 5)`, so we can check if ongoingRecordCount meets out expectations. In this case, the ongoingRecordCount of activeEpoch should change 4 -> 0 -> 4 -> 0 when executing `before processWatermark` -> `after processWatermark` -> `add StreamRecord(5, 6, 7, 8)` -> `endInput` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org