XComp commented on code in PR #21416:
URL: https://github.com/apache/flink/pull/21416#discussion_r1038158237


##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java:
##########
@@ -99,23 +100,48 @@ void testReaderCheckpoints() throws Exception {
                         new 
NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
                         new 
NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
 
-        long remainingInCycle = elementsPerCycle;
-        while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
-            if (--remainingInCycle <= 0) {
-                remainingInCycle = elementsPerCycle;
-                // checkpoint
-                List<NumberSequenceSource.NumberSequenceSplit> splits = 
reader.snapshotState(1L);
-
-                // re-create and restore
-                reader = createReader();
-                if (splits.isEmpty()) {
-                    reader.notifyNoMoreSplits();
-                } else {
-                    reader.addSplits(splits);
-                }
+        for (int cycle = 0; cycle < numCycles; cycle++) {
+            // this call is not required but mimics what happens at runtime
+            assertThat(reader.pollNext(out))
+                    .as(
+                            "Each poll should return a NOTHING_AVAILABLE 
status to explicitly trigger the availability check through in 
SourceReader.isAvailable")
+                    .isSameAs(InputStatus.NOTHING_AVAILABLE);
+            for (int elementInCycle = 0; elementInCycle < elementsPerCycle; 
elementInCycle++) {
+                assertThat(reader.isAvailable())
+                        .as(
+                                "There should be always data available because 
the test doesn't rely on any no rate-limiting strategy and splits are 
provided.")

Review Comment:
   ```suggestion
                                   "There should be always data available 
because the test utilizes no rate-limiting strategy and splits are provided.")
   ```



##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java:
##########
@@ -99,23 +100,48 @@ void testReaderCheckpoints() throws Exception {
                         new 
NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
                         new 
NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
 
-        long remainingInCycle = elementsPerCycle;
-        while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
-            if (--remainingInCycle <= 0) {
-                remainingInCycle = elementsPerCycle;
-                // checkpoint
-                List<NumberSequenceSource.NumberSequenceSplit> splits = 
reader.snapshotState(1L);
-
-                // re-create and restore
-                reader = createReader();
-                if (splits.isEmpty()) {
-                    reader.notifyNoMoreSplits();
-                } else {
-                    reader.addSplits(splits);
-                }
+        for (int cycle = 0; cycle < numCycles; cycle++) {
+            // this call is not required but mimics what happens at runtime
+            assertThat(reader.pollNext(out))
+                    .as(
+                            "Each poll should return a NOTHING_AVAILABLE 
status to explicitly trigger the availability check through in 
SourceReader.isAvailable")
+                    .isSameAs(InputStatus.NOTHING_AVAILABLE);
+            for (int elementInCycle = 0; elementInCycle < elementsPerCycle; 
elementInCycle++) {
+                assertThat(reader.isAvailable())
+                        .as(
+                                "There should be always data available because 
the test doesn't rely on any no rate-limiting strategy and splits are 
provided.")
+                        .isCompleted();
+                // this never returns END_OF_INPUT because 
IteratorSourceReaderBase#pollNext does
+                // not immediately return END_OF_INPUT when the input is 
exhausted
+                assertThat(reader.pollNext(out))
+                        .as(
+                                "Each poll should return a NOTHING_AVAILABLE 
status to explicitly trigger the availability check through in 
SourceReader.isAvailable")
+                        .isSameAs(InputStatus.NOTHING_AVAILABLE);
+            }
+            // checkpoint
+            List<NumberSequenceSource.NumberSequenceSplit> splits = 
reader.snapshotState(1L);
+            // first cycle partially consumes the first split
+            // second cycle consumes the remaining first split and partially 
consumes the second
+            // third cycle consumes remaining second split
+            assertThat(splits).hasSize(numCycles - cycle - 1);
+
+            // re-create and restore
+            reader = createReader();
+            if (splits.isEmpty()) {
+                reader.notifyNoMoreSplits();
+            } else {
+                reader.addSplits(splits);
             }
         }
 
+        // we need to go again through isAvailable because 
IteratorSourceReaderBase#pollNext does
+        // not immediately return END_OF_INPUT when the input is exhausted
+        assertThat(reader.isAvailable())
+                .as(
+                        "There should be always data available because the 
test doesn't rely on any no rate-limiting strategy and splits are provided.")

Review Comment:
   ```suggestion
                           "There should be always data available because the 
test utilizes no rate-limiting strategy and splits are provided.")
   ```



##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java:
##########
@@ -99,23 +99,28 @@ void testReaderCheckpoints() throws Exception {
                         new 
NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
                         new 
NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
 
-        long remainingInCycle = elementsPerCycle;
-        while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
-            if (--remainingInCycle <= 0) {
-                remainingInCycle = elementsPerCycle;
-                // checkpoint
-                List<NumberSequenceSource.NumberSequenceSplit> splits = 
reader.snapshotState(1L);
-
-                // re-create and restore
-                reader = createReader();
-                if (splits.isEmpty()) {
-                    reader.notifyNoMoreSplits();
-                } else {
-                    reader.addSplits(splits);
-                }
+        for (int cycle = 0; cycle < 3; cycle++) {
+            // this call is not required but mimics what happens at runtime
+            reader.pollNext(out);
+            for (int elementInCycle = 0; elementInCycle < elementsPerCycle; 
elementInCycle++) {
+                reader.isAvailable().get();
+                reader.pollNext(out);
+            }
+            // checkpoint
+            List<NumberSequenceSource.NumberSequenceSplit> splits = 
reader.snapshotState(1L);
+
+            // re-create and restore
+            reader = createReader();
+            if (splits.isEmpty()) {
+                reader.notifyNoMoreSplits();
+            } else {
+                reader.addSplits(splits);
             }
         }
 
+        reader.isAvailable().get();

Review Comment:
   ah true, it relies on 
[IteratorSourceReaderBase#noMoreSplits](https://github.com/apache/flink/blob/a5667e82e25cb87dc5523b82b08aec3e1408e9c6/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReaderBase.java#L136).
 Thanks for clarification. :+1: 



##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java:
##########
@@ -184,8 +183,9 @@ void testGatedRateLimiter() throws Exception {
 
         final GeneratorFunction<Long, Long> generatorFunction = index -> 1L;
 
+        int numCycles = 3;
         // Allow each subtask to produce at least 3 cycles, gated by 
checkpoints
-        int count = capacityPerCycle * 3;
+        int count = capacityPerCycle * numCycles;

Review Comment:
   fyi, in case you have overlooked it. 8)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to