This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e4e0f2320489882d4445ebda40dd7c5638f2ca95
Author: Stephan Ewen <[email protected]>
AuthorDate: Mon Oct 12 17:38:30 2020 +0200

    [FLINK-19448][connector base] Explicitly check for un-expected condition 
that would leave an inconsistent state
    
    This condition should never happen, but if it ever happened, it would leave 
the source in an idle state waiting for
    more input data, rather than shutting down.
---
 .../flink/connector/base/source/reader/SourceReaderBase.java       | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index 1182cae..e1da654 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -262,10 +262,13 @@ public abstract class SourceReaderBase<E, T, SplitT 
extends SourceSplit, SplitSt
 
        private InputStatus finishedOrAvailableLater() {
                final boolean allFetchersHaveShutdown = 
splitFetcherManager.maybeShutdownFinishedFetchers();
-               if (noMoreSplitsAssignment && allFetchersHaveShutdown && 
elementsQueue.isEmpty()) {
+               if (!(noMoreSplitsAssignment && allFetchersHaveShutdown)) {
+                       return InputStatus.NOTHING_AVAILABLE;
+               }
+               if (elementsQueue.isEmpty()) {
                        return InputStatus.END_OF_INPUT;
                } else {
-                       return InputStatus.NOTHING_AVAILABLE;
+                       throw new IllegalStateException("Called 
'finishedOrAvailableLater()' with shut-down fetchers but non-empty queue");
                }
        }
 

Reply via email to