This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0f63cb9a5f633d9fa151b9adf0e63b4f912b38e2
Author: Stephan Ewen <[email protected]>
AuthorDate: Tue Sep 15 21:27:00 2020 +0200

    [FLINK-19225][connectors] Various small improvements to SourceReaderBase 
(part 2)
    
      - SourceReaderBase avoids not emitting an element (and exiting to caller 
/ mailbox) when
        transitioning between fetches
    
      - Avoid eager checking of queue empty condition (requires lock 
acquisition) when determining
        whether end of input is reached. Check that expensive condition last 
instead.
---
 .../flink/connector/base/source/reader/SourceReaderBase.java   | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index fb4e6df9..97a6a95 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -136,7 +136,10 @@ public abstract class SourceReaderBase<E, T, SplitT 
extends SourceSplit, SplitSt
                                return trace(InputStatus.MORE_AVAILABLE);
                        }
                        else if (!moveToNextSplit(recordsWithSplitId, output)) {
-                               return trace(finishedOrAvailableLater());
+                               // The fetch is done and we just discovered 
that and have not emitted anything, yet.
+                               // We need to move to the next fetch. As a 
shortcut, we call pollNext() here again,
+                               // rather than emitting nothing and waiting for 
the caller to call us again.
+                               return pollNext(output);
                        }
                        // else fall through the loop
                }
@@ -258,9 +261,8 @@ public abstract class SourceReaderBase<E, T, SplitT extends 
SourceSplit, SplitSt
        // ------------------ private helper methods ---------------------
 
        private InputStatus finishedOrAvailableLater() {
-               boolean allFetchersHaveShutdown = 
splitFetcherManager.maybeShutdownFinishedFetchers();
-               boolean allElementsEmitted = elementsQueue.isEmpty();
-               if (noMoreSplitsAssignment && allFetchersHaveShutdown && 
allElementsEmitted) {
+               final boolean allFetchersHaveShutdown = 
splitFetcherManager.maybeShutdownFinishedFetchers();
+               if (noMoreSplitsAssignment && allFetchersHaveShutdown && 
elementsQueue.isEmpty()) {
                        return InputStatus.END_OF_INPUT;
                } else {
                        return InputStatus.NOTHING_AVAILABLE;

Reply via email to