This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit e4e0f2320489882d4445ebda40dd7c5638f2ca95 Author: Stephan Ewen <[email protected]> AuthorDate: Mon Oct 12 17:38:30 2020 +0200 [FLINK-19448][connector base] Explicitly check for un-expected condition that would leave an inconsistent state This condition should never happen, but if it ever happened, it would leave the source in an idle state waiting for more input data, rather than shutting down. --- .../flink/connector/base/source/reader/SourceReaderBase.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java index 1182cae..e1da654 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java @@ -262,10 +262,13 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt private InputStatus finishedOrAvailableLater() { final boolean allFetchersHaveShutdown = splitFetcherManager.maybeShutdownFinishedFetchers(); - if (noMoreSplitsAssignment && allFetchersHaveShutdown && elementsQueue.isEmpty()) { + if (!(noMoreSplitsAssignment && allFetchersHaveShutdown)) { + return InputStatus.NOTHING_AVAILABLE; + } + if (elementsQueue.isEmpty()) { return InputStatus.END_OF_INPUT; } else { - return InputStatus.NOTHING_AVAILABLE; + throw new IllegalStateException("Called 'finishedOrAvailableLater()' with shut-down fetchers but non-empty queue"); } }
