[ 
https://issues.apache.org/jira/browse/FLINK-10809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683716#comment-16683716
 ] 

ASF GitHub Bot commented on FLINK-10809:
----------------------------------------

tzulitai commented on a change in pull request #7048: [FLINK-10809][state] 
Include keyed state that is not from head operat…
URL: https://github.com/apache/flink/pull/7048#discussion_r232637615
 
 

 ##########
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase.java
 ##########
 @@ -187,26 +208,71 @@ public void run(SourceContext<Tuple2<Integer, Integer>> 
out) throws Exception {
                        this.running = true;
                        try {
                                while (running) {
-                                       Integer key = din.readInt();
-                                       Integer val = din.readInt();
-                                       out.collect(new Tuple2<>(key, val));
+
+                                       checkFail();
+
+                                       synchronized (out.getCheckpointLock()) {
+                                               Integer key = din.readInt();
+                                               Integer val = din.readInt();
+                                               out.collect(new Tuple2<>(key, 
val));
+
+                                               position += 2 * Integer.BYTES;
+                                       }
                                }
                        } catch (EOFException ignore) {
+                               while (!isRestored) {
 
 Review comment:
   Shouldn't then the `checkFail` call alone be enough? The `isRestored` flag 
is already checked there.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Using DataStreamUtils.reinterpretAsKeyedStream produces corrupted keyed state 
> after restore
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-10809
>                 URL: https://issues.apache.org/jira/browse/FLINK-10809
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API, State Backends, Checkpointing
>    Affects Versions: 1.7.0
>            Reporter: Dawid Wysakowicz
>            Assignee: Stefan Richter
>            Priority: Major
>              Labels: pull-request-available
>
> I've tried using {{DataStreamUtils.reinterpretAsKeyedStream}} for results of 
> windowed aggregation:
> {code}
>               DataStream<Tuple2<Integer, List<Event>>> eventStream4 = 
> eventStream2.keyBy(Event::getKey)
>                       
> .window(SlidingEventTimeWindows.of(Time.milliseconds(150 * 3), 
> Time.milliseconds(150)))
>                       .apply(new WindowFunction<Event, Tuple2<Integer, 
> List<Event>>, Integer, TimeWindow>() {
>                               private static final long serialVersionUID = 
> 3166250579972849440L;
>                               @Override
>                               public void apply(
>                                       Integer key, TimeWindow window, 
> Iterable<Event> input,
>                                       Collector<Tuple2<Integer, List<Event>>> 
> out) throws Exception {
>                                       out.collect(Tuple2.of(key, 
> StreamSupport.stream(input.spliterator(), 
> false).collect(Collectors.toList())));
>                               }
>                       });
>               DataStreamUtils.reinterpretAsKeyedStream(eventStream4, events-> 
> events.f0)
>                       .flatMap(createSlidingWindowCheckMapper(pt))
>                       .addSink(new PrintSinkFunction<>());
> {code}
> and then in the createSlidingWindowCheckMapper I verify that each event 
> belongs to 3 consecutive windows, for which I keep contents of last window in 
> ValueState. In a non-failure setup this check runs fine, but it misses few 
> windows after restore at the beginning.
> {code}
> public class SlidingWindowCheckMapper extends 
> RichFlatMapFunction<Tuple2<Integer, List<Event>>, String> {
>       private static final long serialVersionUID = -744070793650644485L;
>       /** This value state tracks previously seen events with the number of 
> windows they appeared in. */
>       private transient ValueState<List<Tuple2<Event, Integer>>> 
> previousWindow;
>       private final int slideFactor;
>       SlidingWindowCheckMapper(int slideFactor) {
>               this.slideFactor = slideFactor;
>       }
>       @Override
>       public void open(Configuration parameters) throws Exception {
>               ValueStateDescriptor<List<Tuple2<Event, Integer>>> 
> previousWindowDescriptor =
>                       new ValueStateDescriptor<>("previousWindow",
>                               new ListTypeInfo<>(new 
> TupleTypeInfo<>(TypeInformation.of(Event.class), 
> BasicTypeInfo.INT_TYPE_INFO)));
>               previousWindow = 
> getRuntimeContext().getState(previousWindowDescriptor);
>       }
>       @Override
>       public void flatMap(Tuple2<Integer, List<Event>> value, 
> Collector<String> out) throws Exception {
>               List<Tuple2<Event, Integer>> previousWindowValues = 
> Optional.ofNullable(previousWindow.value()).orElseGet(
>                       Collections::emptyList);
>               List<Event> newValues = value.f1;
>               newValues.stream().reduce(new BinaryOperator<Event>() {
>                       @Override
>                       public Event apply(Event event, Event event2) {
>                               if (event2.getSequenceNumber() - 1 != 
> event.getSequenceNumber()) {
>                                       out.collect("Alert: events in window 
> out ouf order!");
>                               }
>                               return event2;
>                       }
>               });
>               List<Tuple2<Event, Integer>> newWindow = new ArrayList<>();
>               for (Tuple2<Event, Integer> windowValue : previousWindowValues) 
> {
>                       if (!newValues.contains(windowValue.f0)) {
>                               out.collect(String.format("Alert: event %s did 
> not belong to %d consecutive windows. Event seen so far %d times.Current 
> window: %s",
>                                       windowValue.f0,
>                                       slideFactor,
>                                       windowValue.f1,
>                                       value.f1));
>                       } else {
>                               newValues.remove(windowValue.f0);
>                               if (windowValue.f1 + 1 != slideFactor) {
>                                       newWindow.add(Tuple2.of(windowValue.f0, 
> windowValue.f1 + 1));
>                               }
>                       }
>               }
>               newValues.forEach(e -> newWindow.add(Tuple2.of(e, 1)));
>               previousWindow.update(newWindow);
>       }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to