xuyangzhong commented on code in PR #26708: URL: https://github.com/apache/flink/pull/26708#discussion_r2162802560
########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/keyordered/Epoch.java: ########## @@ -72,6 +74,7 @@ public void setOutput(Consumer<StreamElementQueueEntry<OUT>> outputConsumer) { public void decrementCount() { ongoingRecordCount--; + Preconditions.checkState(ongoingRecordCount >= 0); Review Comment: The inconsistency lies in the submitRecord method of the TableAsyncExecutionController. Although we place a record into an epoch, the ongoing record counter, `ongoingRecordCount`, does not actually increment by 1. > "I wonder why the caller cannot check that the decrementCount should not be called if there is nothing to decrement." I'm not the developer of the TableAsyncExecutionController, but I suspect this design is intended to minimize the exposure of the `ongoingRecordCount` counter to external components. > "Check before the decrement." Done. > "The logic should be made thread-safe." Currently, this logic is handled only within the mailbox thread. According to FLINK-37921, the TableAsyncExecutionController will be removed, so I prefer not to invest too much effort into refactoring this code to avoid blocking 2.1(This has resulted in too many CI failures.). WDYT? @davidradl @xishuaidelin -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org