This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0f63cb9a5f633d9fa151b9adf0e63b4f912b38e2 Author: Stephan Ewen <[email protected]> AuthorDate: Tue Sep 15 21:27:00 2020 +0200 [FLINK-19225][connectors] Various small improvements to SourceReaderBase (part 2) - SourceReaderBase avoids not emitting an element (and exiting to caller / mailbox) when transitioning between fetches - Avoid eager checking of queue empty condition (requires lock acquisition) when determining whether end of input is reached. Check that expensive condition last instead. --- .../flink/connector/base/source/reader/SourceReaderBase.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java index fb4e6df9..97a6a95 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java @@ -136,7 +136,10 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt return trace(InputStatus.MORE_AVAILABLE); } else if (!moveToNextSplit(recordsWithSplitId, output)) { - return trace(finishedOrAvailableLater()); + // The fetch is done and we just discovered that and have not emitted anything, yet. + // We need to move to the next fetch. As a shortcut, we call pollNext() here again, + // rather than emitting nothing and waiting for the caller to call us again. + return pollNext(output); } // else fall through the loop } @@ -258,9 +261,8 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt // ------------------ private helper methods --------------------- private InputStatus finishedOrAvailableLater() { - boolean allFetchersHaveShutdown = splitFetcherManager.maybeShutdownFinishedFetchers(); - boolean allElementsEmitted = elementsQueue.isEmpty(); - if (noMoreSplitsAssignment && allFetchersHaveShutdown && allElementsEmitted) { + final boolean allFetchersHaveShutdown = splitFetcherManager.maybeShutdownFinishedFetchers(); + if (noMoreSplitsAssignment && allFetchersHaveShutdown && elementsQueue.isEmpty()) { return InputStatus.END_OF_INPUT; } else { return InputStatus.NOTHING_AVAILABLE;
