[ 
https://issues.apache.org/jira/browse/FLINK-8735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372706#comment-16372706
 ] 

ASF GitHub Bot commented on FLINK-8735:
---------------------------------------

Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5552#discussion_r169928328
  
    --- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java
 ---
    @@ -468,61 +518,6 @@ public void flatMap(Tuple2<Long, Long> value, 
Collector<Tuple2<Long, Long>> out)
                }
        }
     
    -   private static class CheckpointedUdfOperator
    -           extends AbstractUdfStreamOperator<Tuple2<Long, Long>, 
FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
    -           implements OneInputStreamOperator<Tuple2<Long, Long>, 
Tuple2<Long, Long>> {
    -           private static final long serialVersionUID = 1L;
    -
    -           private static final String CHECKPOINTED_STRING = "Oh my, 
that's nice!";
    -
    -           public CheckpointedUdfOperator(FlatMapFunction<Tuple2<Long, 
Long>, Tuple2<Long, Long>> userFunction) {
    -                   super(userFunction);
    -           }
    -
    -           @Override
    -           public void processElement(StreamRecord<Tuple2<Long, Long>> 
element) throws Exception {
    -                   userFunction.flatMap(element.getValue(), new 
TimestampedCollector<>(output));
    -           }
    -
    -           @Override
    -           public void processWatermark(Watermark mark) throws Exception {
    -                   output.emitWatermark(mark);
    -           }
    -   }
    -
    -   private static class CheckingRestoringUdfOperator
    -           extends AbstractUdfStreamOperator<Tuple2<Long, Long>, 
FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
    -           implements OneInputStreamOperator<Tuple2<Long, Long>, 
Tuple2<Long, Long>> {
    -
    -           private static final long serialVersionUID = 1L;
    -
    -           public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR 
= CheckingRestoringUdfOperator.class + "_RESTORE_CHECK";
    -
    -           private String restoredState;
    -
    -           public 
CheckingRestoringUdfOperator(FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, 
Long>> userFunction) {
    -                   super(userFunction);
    -           }
    -
    -           @Override
    -           public void open() throws Exception {
    -                   super.open();
    -
    -                   
getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new 
IntCounter());
    -           }
    -
    -           @Override
    -           public void processElement(StreamRecord<Tuple2<Long, Long>> 
element) throws Exception {
    -                   userFunction.flatMap(element.getValue(), new 
TimestampedCollector<>(output));
    -                   
getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
    -           }
    -
    -           @Override
    -           public void processWatermark(Watermark mark) throws Exception {
    -                   output.emitWatermark(mark);
    -           }
    -   }
    -
        private static class TimelyStatefulOperator
                extends AbstractStreamOperator<Tuple2<Long, Long>>
    --- End diff --
    
    Code-style comment: for code readability I would suggest to indent by one 
more `tab` the `extends`/`implements` lines, as this allows to separate them 
from the class fields. This is a general comment, I just put it here because it 
is more obvious that it can be difficult to separate the `implements`/`extends` 
clauses from the following `private static final long serialVersionUID = 1L;`


> Add savepoint migration ITCase that covers operator state
> ---------------------------------------------------------
>
>                 Key: FLINK-8735
>                 URL: https://issues.apache.org/jira/browse/FLINK-8735
>             Project: Flink
>          Issue Type: Bug
>          Components: Tests
>    Affects Versions: 1.4.0, 1.5.0
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>            Priority: Blocker
>             Fix For: 1.5.0, 1.4.2
>
>
> The current {{StatefulJobSavepointMigrationITCase}} does not cover operator 
> state, meaning state accessed using {{OperatorStateStore}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to