[ 
https://issues.apache.org/jira/browse/FLINK-10809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber reopened FLINK-10809:
---------------------------------

> Using DataStreamUtils.reinterpretAsKeyedStream produces corrupted keyed state 
> after restore
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-10809
>                 URL: https://issues.apache.org/jira/browse/FLINK-10809
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API, State Backends, Checkpointing
>    Affects Versions: 1.5.5, 1.6.2, 1.7.0
>            Reporter: Dawid Wysakowicz
>            Assignee: Stefan Richter
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.6.3, 1.7.0
>
>
> I've tried using {{DataStreamUtils.reinterpretAsKeyedStream}} for results of 
> windowed aggregation:
> {code}
>               DataStream<Tuple2<Integer, List<Event>>> eventStream4 = 
> eventStream2.keyBy(Event::getKey)
>                       
> .window(SlidingEventTimeWindows.of(Time.milliseconds(150 * 3), 
> Time.milliseconds(150)))
>                       .apply(new WindowFunction<Event, Tuple2<Integer, 
> List<Event>>, Integer, TimeWindow>() {
>                               private static final long serialVersionUID = 
> 3166250579972849440L;
>                               @Override
>                               public void apply(
>                                       Integer key, TimeWindow window, 
> Iterable<Event> input,
>                                       Collector<Tuple2<Integer, List<Event>>> 
> out) throws Exception {
>                                       out.collect(Tuple2.of(key, 
> StreamSupport.stream(input.spliterator(), 
> false).collect(Collectors.toList())));
>                               }
>                       });
>               DataStreamUtils.reinterpretAsKeyedStream(eventStream4, events-> 
> events.f0)
>                       .flatMap(createSlidingWindowCheckMapper(pt))
>                       .addSink(new PrintSinkFunction<>());
> {code}
> and then in the createSlidingWindowCheckMapper I verify that each event 
> belongs to 3 consecutive windows, for which I keep contents of last window in 
> ValueState. In a non-failure setup this check runs fine, but it misses few 
> windows after restore at the beginning.
> {code}
> public class SlidingWindowCheckMapper extends 
> RichFlatMapFunction<Tuple2<Integer, List<Event>>, String> {
>       private static final long serialVersionUID = -744070793650644485L;
>       /** This value state tracks previously seen events with the number of 
> windows they appeared in. */
>       private transient ValueState<List<Tuple2<Event, Integer>>> 
> previousWindow;
>       private final int slideFactor;
>       SlidingWindowCheckMapper(int slideFactor) {
>               this.slideFactor = slideFactor;
>       }
>       @Override
>       public void open(Configuration parameters) throws Exception {
>               ValueStateDescriptor<List<Tuple2<Event, Integer>>> 
> previousWindowDescriptor =
>                       new ValueStateDescriptor<>("previousWindow",
>                               new ListTypeInfo<>(new 
> TupleTypeInfo<>(TypeInformation.of(Event.class), 
> BasicTypeInfo.INT_TYPE_INFO)));
>               previousWindow = 
> getRuntimeContext().getState(previousWindowDescriptor);
>       }
>       @Override
>       public void flatMap(Tuple2<Integer, List<Event>> value, 
> Collector<String> out) throws Exception {
>               List<Tuple2<Event, Integer>> previousWindowValues = 
> Optional.ofNullable(previousWindow.value()).orElseGet(
>                       Collections::emptyList);
>               List<Event> newValues = value.f1;
>               newValues.stream().reduce(new BinaryOperator<Event>() {
>                       @Override
>                       public Event apply(Event event, Event event2) {
>                               if (event2.getSequenceNumber() - 1 != 
> event.getSequenceNumber()) {
>                                       out.collect("Alert: events in window 
> out ouf order!");
>                               }
>                               return event2;
>                       }
>               });
>               List<Tuple2<Event, Integer>> newWindow = new ArrayList<>();
>               for (Tuple2<Event, Integer> windowValue : previousWindowValues) 
> {
>                       if (!newValues.contains(windowValue.f0)) {
>                               out.collect(String.format("Alert: event %s did 
> not belong to %d consecutive windows. Event seen so far %d times.Current 
> window: %s",
>                                       windowValue.f0,
>                                       slideFactor,
>                                       windowValue.f1,
>                                       value.f1));
>                       } else {
>                               newValues.remove(windowValue.f0);
>                               if (windowValue.f1 + 1 != slideFactor) {
>                                       newWindow.add(Tuple2.of(windowValue.f0, 
> windowValue.f1 + 1));
>                               }
>                       }
>               }
>               newValues.forEach(e -> newWindow.add(Tuple2.of(e, 1)));
>               previousWindow.update(newWindow);
>       }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to