kezhuw commented on a change in pull request #15557:
URL: https://github.com/apache/flink/pull/15557#discussion_r612363186



##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
##########
@@ -71,40 +70,67 @@
      */
     @Nullable private SplitT currentSplit;
 
-    /** The remaining splits. Null means no splits have yet been assigned. */
-    @Nullable private Queue<SplitT> remainingSplits;
+    /** The remaining splits that were assigned but not yet processed. */
+    private Queue<SplitT> remainingSplits;
+
+    private boolean noMoreSplits;
 
     public IteratorSourceReader(SourceReaderContext context) {
         this.context = checkNotNull(context);
         this.availability = new CompletableFuture<>();
+        this.remainingSplits = new ArrayDeque<>();
     }
 
     // ------------------------------------------------------------------------
 
     @Override
     public void start() {
-        // request a split only if we did not get one during restore
-        if (remainingSplits == null) {
+        // request a split if we don't have one
+        if (remainingSplits.isEmpty()) {
             context.sendSplitRequest();
         }
     }
 
     @Override
     public InputStatus pollNext(ReaderOutput<E> output) {
-        if (iterator != null && iterator.hasNext()) {
-            output.collect(iterator.next());
+        if (iterator != null) {
+            if (iterator.hasNext()) {
+                output.collect(iterator.next());
+                return InputStatus.MORE_AVAILABLE;
+            } else {
+                finishSplit();
+            }
+        }
+
+        return tryMoveToNextSplit();
+    }
+
+    private void finishSplit() {
+        iterator = null;
+        currentSplit = null;
+
+        // request another split if no other is left
+        // we do this only here in the finishSplit part to avoid requesting a 
split
+        // whenever the reader is polled and doesn't currently have a split
+        if (remainingSplits.isEmpty() && !noMoreSplits) {

Review comment:
       I overlooked the comment apparently 😅 .




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to