XComp commented on code in PR #21416:
URL: https://github.com/apache/flink/pull/21416#discussion_r1037924554
##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java:
##########
@@ -99,23 +99,28 @@ void testReaderCheckpoints() throws Exception {
new
NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
new
NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
- long remainingInCycle = elementsPerCycle;
- while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
- if (--remainingInCycle <= 0) {
- remainingInCycle = elementsPerCycle;
- // checkpoint
- List<NumberSequenceSource.NumberSequenceSplit> splits =
reader.snapshotState(1L);
-
- // re-create and restore
- reader = createReader();
- if (splits.isEmpty()) {
- reader.notifyNoMoreSplits();
- } else {
- reader.addSplits(splits);
- }
+ for (int cycle = 0; cycle < 3; cycle++) {
+ // this call is not required but mimics what happens at runtime
+ reader.pollNext(out);
+ for (int elementInCycle = 0; elementInCycle < elementsPerCycle;
elementInCycle++) {
+ reader.isAvailable().get();
+ reader.pollNext(out);
Review Comment:
```suggestion
assertThat(reader.pollNext(out))
.as(
"Each poll should return a NOTHING_AVAILABLE
status to explicitly trigger the availability check through in
SourceReader.isAvailable")
.isEqualTo(InputStatus.NOTHING_AVAILABLE);
for (int elementInCycle = 0; elementInCycle < elementsPerCycle;
elementInCycle++) {
reader.isAvailable().get();
assertThat(reader.pollNext(out))
.as(
"Each poll should return a NOTHING_AVAILABLE
status to explicitly trigger the availability check through in
SourceReader.isAvailable")
.isEqualTo(InputStatus.NOTHING_AVAILABLE);
```
I feel like we should also add assertion here with a description to reason
why the `NOTHING_AVAILABLE` is expected. Otherwise, it's kind of unintuitive
that `NOTHING_AVAILABLE` is returned even though there is actually data
available. WDYT?
##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java:
##########
@@ -99,23 +99,28 @@ void testReaderCheckpoints() throws Exception {
new
NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
new
NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
- long remainingInCycle = elementsPerCycle;
- while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
- if (--remainingInCycle <= 0) {
- remainingInCycle = elementsPerCycle;
- // checkpoint
- List<NumberSequenceSource.NumberSequenceSplit> splits =
reader.snapshotState(1L);
-
- // re-create and restore
- reader = createReader();
- if (splits.isEmpty()) {
- reader.notifyNoMoreSplits();
- } else {
- reader.addSplits(splits);
- }
+ for (int cycle = 0; cycle < 3; cycle++) {
+ // this call is not required but mimics what happens at runtime
+ reader.pollNext(out);
+ for (int elementInCycle = 0; elementInCycle < elementsPerCycle;
elementInCycle++) {
+ reader.isAvailable().get();
+ reader.pollNext(out);
+ }
+ // checkpoint
+ List<NumberSequenceSource.NumberSequenceSplit> splits =
reader.snapshotState(1L);
Review Comment:
Considering the test's name, should we also assert here that the right
splits are returned in each cycle? :thinking:
##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java:
##########
@@ -99,23 +99,28 @@ void testReaderCheckpoints() throws Exception {
new
NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
new
NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
- long remainingInCycle = elementsPerCycle;
- while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
- if (--remainingInCycle <= 0) {
- remainingInCycle = elementsPerCycle;
- // checkpoint
- List<NumberSequenceSource.NumberSequenceSplit> splits =
reader.snapshotState(1L);
-
- // re-create and restore
- reader = createReader();
- if (splits.isEmpty()) {
- reader.notifyNoMoreSplits();
- } else {
- reader.addSplits(splits);
- }
+ for (int cycle = 0; cycle < 3; cycle++) {
+ // this call is not required but mimics what happens at runtime
+ reader.pollNext(out);
+ for (int elementInCycle = 0; elementInCycle < elementsPerCycle;
elementInCycle++) {
+ reader.isAvailable().get();
+ reader.pollNext(out);
+ }
+ // checkpoint
+ List<NumberSequenceSource.NumberSequenceSplit> splits =
reader.snapshotState(1L);
+
+ // re-create and restore
+ reader = createReader();
+ if (splits.isEmpty()) {
+ reader.notifyNoMoreSplits();
+ } else {
+ reader.addSplits(splits);
}
}
+ reader.isAvailable().get();
Review Comment:
```suggestion
```
Is this really a requirement? Looking at
[StreamTask#processInput:561](https://github.com/apache/flink/blob/5601da7cc251eac479fe24167c0f58dbd963072f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L561)
shows that we're not looping over `isAvailable` anymore if reaching the end of
data. Also from a logical standpoint, I would expect `isAvailable` to complete
if we reach the end of the data stream. :thinking:
##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java:
##########
@@ -99,23 +99,28 @@ void testReaderCheckpoints() throws Exception {
new
NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
new
NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
- long remainingInCycle = elementsPerCycle;
- while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
- if (--remainingInCycle <= 0) {
- remainingInCycle = elementsPerCycle;
- // checkpoint
- List<NumberSequenceSource.NumberSequenceSplit> splits =
reader.snapshotState(1L);
-
- // re-create and restore
- reader = createReader();
- if (splits.isEmpty()) {
- reader.notifyNoMoreSplits();
- } else {
- reader.addSplits(splits);
- }
+ for (int cycle = 0; cycle < 3; cycle++) {
+ // this call is not required but mimics what happens at runtime
+ reader.pollNext(out);
+ for (int elementInCycle = 0; elementInCycle < elementsPerCycle;
elementInCycle++) {
+ reader.isAvailable().get();
Review Comment:
```suggestion
assertThat(reader.isAvailable())
.as(
"There should be always data available
because the test doesn't rely on any no rate-limiting strategy and splits are
provided.")
.isCompleted();
```
nit: I would use `assertj` functionality. `get()` might imply that we're
waiting for concurrent functionality to finish. We don't have concurrent
behavior here, though.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]