AHeise commented on a change in pull request #17792:
URL: https://github.com/apache/flink/pull/17792#discussion_r751969408



##########
File path: 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/LimitableBulkFormat.java
##########
@@ -105,8 +111,16 @@ private boolean reachLimit() {
                 return null;
             }
 
-            RecordIterator<T> batch = reader.readBatch();
-            return batch == null ? null : new LimitableIterator(batch);
+            try {
+                RecordIterator<T> batch = reader.readBatch();
+                return batch == null ? null : new LimitableIterator(batch);
+            } catch (Exception e) {

Review comment:
       This change is a no-op in a sequential threading model. If 
`reachLimit()` returns true, then the first `if` in `readBatch` already returns 
`null`. In all other cases, the exception is rethrown.
   
   So I'm assuming you are actually guarding against some concurrent 
modification in another thread. If so, then I'd rather fix the threading model. 
This class already uses two different mechanisms of dealing with concurrency 
(`synchronized` and `AtomicLong`) and you now add optimistic invocation with 
exception handling as a third. This is a hard to reason. You probably should 
settle with using a `Lock` and use it all the way.
   
   Btw I don't understand many parts of this class: 
   - What's the purpose of `globalNumberRead`? The bulk format instance will be 
replicated per subtask, so why would you need it?
   - Do you ensure that only one subtask is executed or how do you consolidate 
the different lists of the different subtask? `LIMIT N` would produce a list of 
`N` records in each subtask, right? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to