zentol commented on code in PR #21416:
URL: https://github.com/apache/flink/pull/21416#discussion_r1038131318
##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java:
##########
@@ -99,23 +99,28 @@ void testReaderCheckpoints() throws Exception {
new
NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
new
NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
- long remainingInCycle = elementsPerCycle;
- while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
- if (--remainingInCycle <= 0) {
- remainingInCycle = elementsPerCycle;
- // checkpoint
- List<NumberSequenceSource.NumberSequenceSplit> splits =
reader.snapshotState(1L);
-
- // re-create and restore
- reader = createReader();
- if (splits.isEmpty()) {
- reader.notifyNoMoreSplits();
- } else {
- reader.addSplits(splits);
- }
+ for (int cycle = 0; cycle < 3; cycle++) {
+ // this call is not required but mimics what happens at runtime
+ reader.pollNext(out);
+ for (int elementInCycle = 0; elementInCycle < elementsPerCycle;
elementInCycle++) {
+ reader.isAvailable().get();
+ reader.pollNext(out);
+ }
+ // checkpoint
+ List<NumberSequenceSource.NumberSequenceSplit> splits =
reader.snapshotState(1L);
Review Comment:
We can do that, yes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]