zentol commented on code in PR #21416:
URL: https://github.com/apache/flink/pull/21416#discussion_r1038124348


##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java:
##########
@@ -99,23 +99,28 @@ void testReaderCheckpoints() throws Exception {
                         new 
NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
                         new 
NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
 
-        long remainingInCycle = elementsPerCycle;
-        while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
-            if (--remainingInCycle <= 0) {
-                remainingInCycle = elementsPerCycle;
-                // checkpoint
-                List<NumberSequenceSource.NumberSequenceSplit> splits = 
reader.snapshotState(1L);
-
-                // re-create and restore
-                reader = createReader();
-                if (splits.isEmpty()) {
-                    reader.notifyNoMoreSplits();
-                } else {
-                    reader.addSplits(splits);
-                }
+        for (int cycle = 0; cycle < 3; cycle++) {
+            // this call is not required but mimics what happens at runtime
+            reader.pollNext(out);
+            for (int elementInCycle = 0; elementInCycle < elementsPerCycle; 
elementInCycle++) {
+                reader.isAvailable().get();
+                reader.pollNext(out);
+            }
+            // checkpoint
+            List<NumberSequenceSource.NumberSequenceSplit> splits = 
reader.snapshotState(1L);
+
+            // re-create and restore
+            reader = createReader();
+            if (splits.isEmpty()) {
+                reader.notifyNoMoreSplits();
+            } else {
+                reader.addSplits(splits);
             }
         }
 
+        reader.isAvailable().get();

Review Comment:
   yes and no.
   
   > we're not looping over isAvailable anymore if reaching the end of data
   
   Indeed, but the devil is in the details. Whether we reached the end of data 
isn't based on the input being exhausted, but on the source returning 
`END_OF_INPUT` on a call to pollNext().
   And IteratorSourceReaderBase#pollNext (which the data generator source uses 
internally) doesn't return end-of-input when the input is exhausted, but when 
_another_ call was made to pollNext while the input was _already_ exhausted. 
(Which is allowed by the source reader interface)
   
   That's why we need this additional call; the last call to pollNext within 
the loop does not `END_OF_INPUT` (as you expected I suppose) but 
`NOTHING_AVAILABLE`, and so we have to call it again.
   
   It's all a bit finicky because the source reader api isn't intuitive in the 
slightest.
   
   (fixing this in the IteratorSourceReaderBase isn't trivial :/)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to