This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cdc0721f6f4da04e5d08dd0f63a9c503b34410a7
Author: Stephan Ewen <[email protected]>
AuthorDate: Mon Oct 12 17:38:30 2020 +0200

    [FLINK-19448][connector base] Explicitly check for un-expected condition 
that would leave an inconsistent state
    
    This condition should never happen, but if it ever happened, it would leave 
the source in an idle state waiting for
    more input data, rather than shutting down.
---
 .../flink/connector/base/source/reader/SourceReaderBase.java       | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index 1182cae..e1da654 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -262,10 +262,13 @@ public abstract class SourceReaderBase<E, T, SplitT 
extends SourceSplit, SplitSt
 
        private InputStatus finishedOrAvailableLater() {
                final boolean allFetchersHaveShutdown = 
splitFetcherManager.maybeShutdownFinishedFetchers();
-               if (noMoreSplitsAssignment && allFetchersHaveShutdown && 
elementsQueue.isEmpty()) {
+               if (!(noMoreSplitsAssignment && allFetchersHaveShutdown)) {
+                       return InputStatus.NOTHING_AVAILABLE;
+               }
+               if (elementsQueue.isEmpty()) {
                        return InputStatus.END_OF_INPUT;
                } else {
-                       return InputStatus.NOTHING_AVAILABLE;
+                       throw new IllegalStateException("Called 
'finishedOrAvailableLater()' with shut-down fetchers but non-empty queue");
                }
        }
 

Reply via email to