xuyangzhong commented on code in PR #26708:
URL: https://github.com/apache/flink/pull/26708#discussion_r2162802560


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/keyordered/Epoch.java:
##########
@@ -72,6 +74,7 @@ public void setOutput(Consumer<StreamElementQueueEntry<OUT>> 
outputConsumer) {
 
     public void decrementCount() {
         ongoingRecordCount--;
+        Preconditions.checkState(ongoingRecordCount >= 0);

Review Comment:
   The inconsistency lies in the submitRecord method of the 
TableAsyncExecutionController. Although we place a record into an epoch, the 
ongoing record counter, `ongoingRecordCount`, does not actually increment by 1.
   
   > "I wonder why the caller cannot check that the decrementCount should not 
be called if there is nothing to decrement."
   
   I'm not the developer of the TableAsyncExecutionController, but I suspect 
this design is intended to minimize the exposure of the `ongoingRecordCount` 
counter to external components.
   
   > "Check before the decrement."
   
   Done.
   
   > "The logic should be made thread-safe."
   Currently, this logic is handled only within the mailbox thread. 
   
   According to FLINK-37921, the TableAsyncExecutionController will be removed, 
so I prefer not to invest too much effort into refactoring this code to avoid 
blocking 2.1(This has resulted in too many CI failures.). WDYT? @davidradl 
@xishuaidelin 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to