tweise commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r651630642
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
##########
@@ -78,9 +89,14 @@ public InputStatus pollNext(ReaderOutput output) throws
Exception {
// next source can potentially be activated (after all readers
are ready).
readerContext.sendSourceEventToCoordinator(
new SourceReaderFinishedEvent(currentSourceIndex));
- // More data will be available from the next reader.
+ if (!pendingSplits.isEmpty()) {
+ // we have splits for another reader waiting
+ setCurrentReader(pendingSplits.peek().sourceIndex());
+ return InputStatus.MORE_AVAILABLE;
+ }
+ // More splits may arrive for this or subsequent reader.
// InputStatus.NOTHING_AVAILABLE requires us to complete the
availability
- // future after source switch to resume poll.
+ // future after receiving more splits to resume poll.
return InputStatus.NOTHING_AVAILABLE;
Review comment:
We started with that but that lead to unnecessary noise. We want the
polling to stop until the next reader is set.
3a01cd27d1fb36e1fa7faf5500bf97617ce22ed2
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]