Dawid Wysakowicz created FLINK-10809:
----------------------------------------

             Summary: Using DataStreamUtils.reinterpretAsKeyedStream produces 
corrupted keyed state after restore
                 Key: FLINK-10809
                 URL: https://issues.apache.org/jira/browse/FLINK-10809
             Project: Flink
          Issue Type: Bug
          Components: DataStream API, State Backends, Checkpointing
    Affects Versions: 1.7.0
            Reporter: Dawid Wysakowicz


I've tried using {{DataStreamUtils.reinterpretAsKeyedStream}} for results of 
windowed aggregation:
{code}
                DataStream<Tuple2<Integer, List<Event>>> eventStream4 = 
eventStream2.keyBy(Event::getKey)
                        
.window(SlidingEventTimeWindows.of(Time.milliseconds(150 * 3), 
Time.milliseconds(150)))
                        .apply(new WindowFunction<Event, Tuple2<Integer, 
List<Event>>, Integer, TimeWindow>() {
                                private static final long serialVersionUID = 
3166250579972849440L;

                                @Override
                                public void apply(
                                        Integer key, TimeWindow window, 
Iterable<Event> input,
                                        Collector<Tuple2<Integer, List<Event>>> 
out) throws Exception {

                                        out.collect(Tuple2.of(key, 
StreamSupport.stream(input.spliterator(), false).collect(Collectors.toList())));
                                }
                        });

                DataStreamUtils.reinterpretAsKeyedStream(eventStream4, events-> 
events.f0)
                        .flatMap(createSlidingWindowCheckMapper(pt))
                        .addSink(new PrintSinkFunction<>());
{code}

and then in the createSlidingWindowCheckMapper I verify that each event belongs 
to 3 consecutive windows, for which I keep contents of last window in 
ValueState. In a non-failure setup this check runs fine, but it misses few 
windows after restore at the beginning.

{code}
public class SlidingWindowCheckMapper extends 
RichFlatMapFunction<Tuple2<Integer, List<Event>>, String> {

        private static final long serialVersionUID = -744070793650644485L;

        /** This value state tracks previously seen events with the number of 
windows they appeared in. */
        private transient ValueState<List<Tuple2<Event, Integer>>> 
previousWindow;

        private final int slideFactor;

        SlidingWindowCheckMapper(int slideFactor) {
                this.slideFactor = slideFactor;
        }

        @Override
        public void open(Configuration parameters) throws Exception {
                ValueStateDescriptor<List<Tuple2<Event, Integer>>> 
previousWindowDescriptor =
                        new ValueStateDescriptor<>("previousWindow",
                                new ListTypeInfo<>(new 
TupleTypeInfo<>(TypeInformation.of(Event.class), BasicTypeInfo.INT_TYPE_INFO)));

                previousWindow = 
getRuntimeContext().getState(previousWindowDescriptor);
        }

        @Override
        public void flatMap(Tuple2<Integer, List<Event>> value, 
Collector<String> out) throws Exception {
                List<Tuple2<Event, Integer>> previousWindowValues = 
Optional.ofNullable(previousWindow.value()).orElseGet(
                        Collections::emptyList);

                List<Event> newValues = value.f1;
                newValues.stream().reduce(new BinaryOperator<Event>() {
                        @Override
                        public Event apply(Event event, Event event2) {
                                if (event2.getSequenceNumber() - 1 != 
event.getSequenceNumber()) {
                                        out.collect("Alert: events in window 
out ouf order!");
                                }

                                return event2;
                        }
                });

                List<Tuple2<Event, Integer>> newWindow = new ArrayList<>();
                for (Tuple2<Event, Integer> windowValue : previousWindowValues) 
{
                        if (!newValues.contains(windowValue.f0)) {
                                out.collect(String.format("Alert: event %s did 
not belong to %d consecutive windows. Event seen so far %d times.Current 
window: %s",
                                        windowValue.f0,
                                        slideFactor,
                                        windowValue.f1,
                                        value.f1));
                        } else {
                                newValues.remove(windowValue.f0);
                                if (windowValue.f1 + 1 != slideFactor) {
                                        newWindow.add(Tuple2.of(windowValue.f0, 
windowValue.f1 + 1));
                                }
                        }
                }

                newValues.forEach(e -> newWindow.add(Tuple2.of(e, 1)));

                previousWindow.update(newWindow);
        }
}
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to