AHeise commented on a change in pull request #17792:
URL: https://github.com/apache/flink/pull/17792#discussion_r751969408
##########
File path:
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/LimitableBulkFormat.java
##########
@@ -105,8 +111,16 @@ private boolean reachLimit() {
return null;
}
- RecordIterator<T> batch = reader.readBatch();
- return batch == null ? null : new LimitableIterator(batch);
+ try {
+ RecordIterator<T> batch = reader.readBatch();
+ return batch == null ? null : new LimitableIterator(batch);
+ } catch (Exception e) {
Review comment:
This change is a no-op in a sequential threading model. If
`reachLimit()` returns true, then the first `if` in `readBatch` already returns
`null`. In all other cases, the exception is rethrown.
So I'm assuming you are actually guarding against some concurrent
modification in another thread. If so, then I'd rather fix the threading model.
This class already uses two different mechanisms of dealing with concurrency
(`synchronized` and `AtomicLong`) and you now add optimistic invocation with
exception handling as a third. This is a hard to reason. You probably should
settle with using a `Lock` and use it all the way.
Btw I don't understand many parts of this class:
- What's the purpose of `globalNumberRead`? The bulk format instance will be
replicated per subtask, so why would you need it?
- Do you ensure that only one subtask is executed or how do you consolidate
the different lists of the different subtask? `LIMIT N` would produce a list of
`N` records in each subtask, right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]