XComp commented on code in PR #21416:
URL: https://github.com/apache/flink/pull/21416#discussion_r1037924554


##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java:
##########
@@ -99,23 +99,28 @@ void testReaderCheckpoints() throws Exception {
                         new 
NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
                         new 
NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
 
-        long remainingInCycle = elementsPerCycle;
-        while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
-            if (--remainingInCycle <= 0) {
-                remainingInCycle = elementsPerCycle;
-                // checkpoint
-                List<NumberSequenceSource.NumberSequenceSplit> splits = 
reader.snapshotState(1L);
-
-                // re-create and restore
-                reader = createReader();
-                if (splits.isEmpty()) {
-                    reader.notifyNoMoreSplits();
-                } else {
-                    reader.addSplits(splits);
-                }
+        for (int cycle = 0; cycle < 3; cycle++) {
+            // this call is not required but mimics what happens at runtime
+            reader.pollNext(out);
+            for (int elementInCycle = 0; elementInCycle < elementsPerCycle; 
elementInCycle++) {
+                reader.isAvailable().get();
+                reader.pollNext(out);

Review Comment:
   ```suggestion
               assertThat(reader.pollNext(out))
                       .as(
                               "Each poll should return a NOTHING_AVAILABLE 
status to explicitly trigger the availability check through in 
SourceReader.isAvailable")
                       .isEqualTo(InputStatus.NOTHING_AVAILABLE);
               for (int elementInCycle = 0; elementInCycle < elementsPerCycle; 
elementInCycle++) {
                   reader.isAvailable().get();
                   assertThat(reader.pollNext(out))
                           .as(
                                   "Each poll should return a NOTHING_AVAILABLE 
status to explicitly trigger the availability check through in 
SourceReader.isAvailable")
                           .isEqualTo(InputStatus.NOTHING_AVAILABLE);
   ```
   I feel like we should also add assertion here with a description to reason 
why the `NOTHING_AVAILABLE` is expected. Otherwise, it's kind of unintuitive 
that `NOTHING_AVAILABLE` is returned even though there is actually data 
available. WDYT?



##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java:
##########
@@ -99,23 +99,28 @@ void testReaderCheckpoints() throws Exception {
                         new 
NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
                         new 
NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
 
-        long remainingInCycle = elementsPerCycle;
-        while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
-            if (--remainingInCycle <= 0) {
-                remainingInCycle = elementsPerCycle;
-                // checkpoint
-                List<NumberSequenceSource.NumberSequenceSplit> splits = 
reader.snapshotState(1L);
-
-                // re-create and restore
-                reader = createReader();
-                if (splits.isEmpty()) {
-                    reader.notifyNoMoreSplits();
-                } else {
-                    reader.addSplits(splits);
-                }
+        for (int cycle = 0; cycle < 3; cycle++) {
+            // this call is not required but mimics what happens at runtime
+            reader.pollNext(out);
+            for (int elementInCycle = 0; elementInCycle < elementsPerCycle; 
elementInCycle++) {
+                reader.isAvailable().get();
+                reader.pollNext(out);
+            }
+            // checkpoint
+            List<NumberSequenceSource.NumberSequenceSplit> splits = 
reader.snapshotState(1L);

Review Comment:
   Considering the test's name, should we also assert here that the right 
splits are returned in each cycle? :thinking: 



##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java:
##########
@@ -99,23 +99,28 @@ void testReaderCheckpoints() throws Exception {
                         new 
NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
                         new 
NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
 
-        long remainingInCycle = elementsPerCycle;
-        while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
-            if (--remainingInCycle <= 0) {
-                remainingInCycle = elementsPerCycle;
-                // checkpoint
-                List<NumberSequenceSource.NumberSequenceSplit> splits = 
reader.snapshotState(1L);
-
-                // re-create and restore
-                reader = createReader();
-                if (splits.isEmpty()) {
-                    reader.notifyNoMoreSplits();
-                } else {
-                    reader.addSplits(splits);
-                }
+        for (int cycle = 0; cycle < 3; cycle++) {
+            // this call is not required but mimics what happens at runtime
+            reader.pollNext(out);
+            for (int elementInCycle = 0; elementInCycle < elementsPerCycle; 
elementInCycle++) {
+                reader.isAvailable().get();
+                reader.pollNext(out);
+            }
+            // checkpoint
+            List<NumberSequenceSource.NumberSequenceSplit> splits = 
reader.snapshotState(1L);
+
+            // re-create and restore
+            reader = createReader();
+            if (splits.isEmpty()) {
+                reader.notifyNoMoreSplits();
+            } else {
+                reader.addSplits(splits);
             }
         }
 
+        reader.isAvailable().get();

Review Comment:
   ```suggestion
   ```
   Is this really a requirement? Looking at 
[StreamTask#processInput:561](https://github.com/apache/flink/blob/5601da7cc251eac479fe24167c0f58dbd963072f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L561)
 shows that we're not looping over `isAvailable` anymore if reaching the end of 
data. Also from a logical standpoint, I would expect `isAvailable` to complete 
if we reach the end of the data stream. :thinking: 



##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java:
##########
@@ -99,23 +99,28 @@ void testReaderCheckpoints() throws Exception {
                         new 
NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
                         new 
NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
 
-        long remainingInCycle = elementsPerCycle;
-        while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
-            if (--remainingInCycle <= 0) {
-                remainingInCycle = elementsPerCycle;
-                // checkpoint
-                List<NumberSequenceSource.NumberSequenceSplit> splits = 
reader.snapshotState(1L);
-
-                // re-create and restore
-                reader = createReader();
-                if (splits.isEmpty()) {
-                    reader.notifyNoMoreSplits();
-                } else {
-                    reader.addSplits(splits);
-                }
+        for (int cycle = 0; cycle < 3; cycle++) {
+            // this call is not required but mimics what happens at runtime
+            reader.pollNext(out);
+            for (int elementInCycle = 0; elementInCycle < elementsPerCycle; 
elementInCycle++) {
+                reader.isAvailable().get();

Review Comment:
   ```suggestion
                   assertThat(reader.isAvailable())
                           .as(
                                   "There should be always data available 
because the test doesn't rely on any no rate-limiting strategy and splits are 
provided.")
                           .isCompleted();
   ```
   nit: I would use `assertj` functionality. `get()` might imply that we're 
waiting for concurrent functionality to finish. We don't have concurrent 
behavior here, though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to