zentol commented on code in PR #21416:
URL: https://github.com/apache/flink/pull/21416#discussion_r1035881699
##########
flink-connectors/flink-connector-datagen/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java:
##########
@@ -195,40 +195,31 @@ void testGatedRateLimiter() throws Exception {
final DataStreamSource<Long> streamSource =
env.fromSource(generatorSource,
WatermarkStrategy.noWatermarks(), "Data Generator");
- final DataStream<Tuple2<Integer, Long>> map =
- streamSource.map(new SubtaskAndCheckpointMapper());
- final List<Tuple2<Integer, Long>> results =
map.executeAndCollect(1000);
-
- final Map<Tuple2<Integer, Long>, Integer> collect =
- results.stream()
- .collect(
- Collectors.groupingBy(
- x -> (new Tuple2<>(x.f0, x.f1)),
summingInt(x -> 1)));
- for (Map.Entry<Tuple2<Integer, Long>, Integer> entry :
collect.entrySet()) {
- assertThat(entry.getValue()).isEqualTo(capacityPerSubtaskPerCycle);
- }
+ final DataStream<Long> map = streamSource.flatMap(new
FirstCheckpointFilter());
+ final List<Long> results = map.executeAndCollect(1000);
+
+ assertThat(results).hasSize(capacityPerCycle);
}
- private static class SubtaskAndCheckpointMapper
- extends RichMapFunction<Long, Tuple2<Integer, Long>> implements
CheckpointListener {
+ private static class FirstCheckpointFilter
+ implements FlatMapFunction<Long, Long>, CheckpointedFunction {
- private long checkpointId = 0;
- private int subtaskIndex;
+ private volatile boolean firstCheckpoint = true;
@Override
- public void open(Configuration parameters) {
- subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+ public void flatMap(Long value, Collector<Long> out) throws Exception {
+ if (firstCheckpoint) {
+ out.collect(value);
+ }
}
@Override
- public Tuple2<Integer, Long> map(Long value) {
- return new Tuple2<>(subtaskIndex, checkpointId);
+ public void snapshotState(FunctionSnapshotContext context) throws
Exception {
Review Comment:
What we want to happen is that at most X records are emitted per checkpoint.
`emit|emit|snapshot|emit|emit|snapshot`
In the ideal world, this happens:
`emit|emit|snapshot|notify|emit|emit|...`
But since `notifyCheckpointComplete` can arrive at arbitrary times, or not
at all, there are some edge-cases.
We may not emit anything for one checkpoint if the rpc is lost:
`emit|emit|snapshot|snapshot|notify|emit|emit|...`
We may emit more records per checkpoint if the rpc is late:
`emit|emit|snapshot|snapshot|notify|emit|notify|emit|emit|...`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]