XComp commented on code in PR #21416:
URL: https://github.com/apache/flink/pull/21416#discussion_r1038158237
##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java:
##########
@@ -99,23 +100,48 @@ void testReaderCheckpoints() throws Exception {
new
NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
new
NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
- long remainingInCycle = elementsPerCycle;
- while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
- if (--remainingInCycle <= 0) {
- remainingInCycle = elementsPerCycle;
- // checkpoint
- List<NumberSequenceSource.NumberSequenceSplit> splits =
reader.snapshotState(1L);
-
- // re-create and restore
- reader = createReader();
- if (splits.isEmpty()) {
- reader.notifyNoMoreSplits();
- } else {
- reader.addSplits(splits);
- }
+ for (int cycle = 0; cycle < numCycles; cycle++) {
+ // this call is not required but mimics what happens at runtime
+ assertThat(reader.pollNext(out))
+ .as(
+ "Each poll should return a NOTHING_AVAILABLE
status to explicitly trigger the availability check through in
SourceReader.isAvailable")
+ .isSameAs(InputStatus.NOTHING_AVAILABLE);
+ for (int elementInCycle = 0; elementInCycle < elementsPerCycle;
elementInCycle++) {
+ assertThat(reader.isAvailable())
+ .as(
+ "There should be always data available because
the test doesn't rely on any no rate-limiting strategy and splits are
provided.")
Review Comment:
```suggestion
"There should be always data available
because the test utilizes no rate-limiting strategy and splits are provided.")
```
##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java:
##########
@@ -99,23 +100,48 @@ void testReaderCheckpoints() throws Exception {
new
NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
new
NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
- long remainingInCycle = elementsPerCycle;
- while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
- if (--remainingInCycle <= 0) {
- remainingInCycle = elementsPerCycle;
- // checkpoint
- List<NumberSequenceSource.NumberSequenceSplit> splits =
reader.snapshotState(1L);
-
- // re-create and restore
- reader = createReader();
- if (splits.isEmpty()) {
- reader.notifyNoMoreSplits();
- } else {
- reader.addSplits(splits);
- }
+ for (int cycle = 0; cycle < numCycles; cycle++) {
+ // this call is not required but mimics what happens at runtime
+ assertThat(reader.pollNext(out))
+ .as(
+ "Each poll should return a NOTHING_AVAILABLE
status to explicitly trigger the availability check through in
SourceReader.isAvailable")
+ .isSameAs(InputStatus.NOTHING_AVAILABLE);
+ for (int elementInCycle = 0; elementInCycle < elementsPerCycle;
elementInCycle++) {
+ assertThat(reader.isAvailable())
+ .as(
+ "There should be always data available because
the test doesn't rely on any no rate-limiting strategy and splits are
provided.")
+ .isCompleted();
+ // this never returns END_OF_INPUT because
IteratorSourceReaderBase#pollNext does
+ // not immediately return END_OF_INPUT when the input is
exhausted
+ assertThat(reader.pollNext(out))
+ .as(
+ "Each poll should return a NOTHING_AVAILABLE
status to explicitly trigger the availability check through in
SourceReader.isAvailable")
+ .isSameAs(InputStatus.NOTHING_AVAILABLE);
+ }
+ // checkpoint
+ List<NumberSequenceSource.NumberSequenceSplit> splits =
reader.snapshotState(1L);
+ // first cycle partially consumes the first split
+ // second cycle consumes the remaining first split and partially
consumes the second
+ // third cycle consumes remaining second split
+ assertThat(splits).hasSize(numCycles - cycle - 1);
+
+ // re-create and restore
+ reader = createReader();
+ if (splits.isEmpty()) {
+ reader.notifyNoMoreSplits();
+ } else {
+ reader.addSplits(splits);
}
}
+ // we need to go again through isAvailable because
IteratorSourceReaderBase#pollNext does
+ // not immediately return END_OF_INPUT when the input is exhausted
+ assertThat(reader.isAvailable())
+ .as(
+ "There should be always data available because the
test doesn't rely on any no rate-limiting strategy and splits are provided.")
Review Comment:
```suggestion
"There should be always data available because the
test utilizes no rate-limiting strategy and splits are provided.")
```
##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java:
##########
@@ -99,23 +99,28 @@ void testReaderCheckpoints() throws Exception {
new
NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
new
NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
- long remainingInCycle = elementsPerCycle;
- while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
- if (--remainingInCycle <= 0) {
- remainingInCycle = elementsPerCycle;
- // checkpoint
- List<NumberSequenceSource.NumberSequenceSplit> splits =
reader.snapshotState(1L);
-
- // re-create and restore
- reader = createReader();
- if (splits.isEmpty()) {
- reader.notifyNoMoreSplits();
- } else {
- reader.addSplits(splits);
- }
+ for (int cycle = 0; cycle < 3; cycle++) {
+ // this call is not required but mimics what happens at runtime
+ reader.pollNext(out);
+ for (int elementInCycle = 0; elementInCycle < elementsPerCycle;
elementInCycle++) {
+ reader.isAvailable().get();
+ reader.pollNext(out);
+ }
+ // checkpoint
+ List<NumberSequenceSource.NumberSequenceSplit> splits =
reader.snapshotState(1L);
+
+ // re-create and restore
+ reader = createReader();
+ if (splits.isEmpty()) {
+ reader.notifyNoMoreSplits();
+ } else {
+ reader.addSplits(splits);
}
}
+ reader.isAvailable().get();
Review Comment:
ah true, it relies on
[IteratorSourceReaderBase#noMoreSplits](https://github.com/apache/flink/blob/a5667e82e25cb87dc5523b82b08aec3e1408e9c6/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReaderBase.java#L136).
Thanks for clarification. :+1:
##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java:
##########
@@ -184,8 +183,9 @@ void testGatedRateLimiter() throws Exception {
final GeneratorFunction<Long, Long> generatorFunction = index -> 1L;
+ int numCycles = 3;
// Allow each subtask to produce at least 3 cycles, gated by
checkpoints
- int count = capacityPerCycle * 3;
+ int count = capacityPerCycle * numCycles;
Review Comment:
fyi, in case you have overlooked it. 8)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]