Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/5552#discussion_r169928328
--- Diff:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java
---
@@ -468,61 +518,6 @@ public void flatMap(Tuple2<Long, Long> value,
Collector<Tuple2<Long, Long>> out)
}
}
- private static class CheckpointedUdfOperator
- extends AbstractUdfStreamOperator<Tuple2<Long, Long>,
FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
- implements OneInputStreamOperator<Tuple2<Long, Long>,
Tuple2<Long, Long>> {
- private static final long serialVersionUID = 1L;
-
- private static final String CHECKPOINTED_STRING = "Oh my,
that's nice!";
-
- public CheckpointedUdfOperator(FlatMapFunction<Tuple2<Long,
Long>, Tuple2<Long, Long>> userFunction) {
- super(userFunction);
- }
-
- @Override
- public void processElement(StreamRecord<Tuple2<Long, Long>>
element) throws Exception {
- userFunction.flatMap(element.getValue(), new
TimestampedCollector<>(output));
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- output.emitWatermark(mark);
- }
- }
-
- private static class CheckingRestoringUdfOperator
- extends AbstractUdfStreamOperator<Tuple2<Long, Long>,
FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
- implements OneInputStreamOperator<Tuple2<Long, Long>,
Tuple2<Long, Long>> {
-
- private static final long serialVersionUID = 1L;
-
- public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR
= CheckingRestoringUdfOperator.class + "_RESTORE_CHECK";
-
- private String restoredState;
-
- public
CheckingRestoringUdfOperator(FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long,
Long>> userFunction) {
- super(userFunction);
- }
-
- @Override
- public void open() throws Exception {
- super.open();
-
-
getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new
IntCounter());
- }
-
- @Override
- public void processElement(StreamRecord<Tuple2<Long, Long>>
element) throws Exception {
- userFunction.flatMap(element.getValue(), new
TimestampedCollector<>(output));
-
getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- output.emitWatermark(mark);
- }
- }
-
private static class TimelyStatefulOperator
extends AbstractStreamOperator<Tuple2<Long, Long>>
--- End diff --
Code-style comment: for code readability I would suggest to indent by one
more `tab` the `extends`/`implements` lines, as this allows to separate them
from the class fields. This is a general comment, I just put it here because it
is more obvious that it can be difficult to separate the `implements`/`extends`
clauses from the following `private static final long serialVersionUID = 1L;`
---