[ 
https://issues.apache.org/jira/browse/FLINK-10809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16678454#comment-16678454
 ] 

ASF GitHub Bot commented on FLINK-10809:
----------------------------------------

StefanRRichter opened a new pull request #7048: [FLINK-10809][state] Include 
keyed state that is not from head operat…
URL: https://github.com/apache/flink/pull/7048
 
 
   …ors in state assignment
   
   ## What is the purpose of the change
   
   This PR includes keyed state that was not from a head operator (head of 
operator chain) in the state assignment. This fixes problems with restoring 
keyed state for operators after `DataStreamUtils.reinterpretAsKeyedStream`.
   
   
   ## Brief change log
   
   Remove a check if keyed state is from a head operator in the state 
assignment algorithm. This was an optimization from the times where Flink only 
allowed keyed state in the head operators (like what happens after every 
`keyBy`).
   
   
   ## Verifying this change
   
   Extended `ReinterpretDataStreamAsKeyedStreamITCase` with a recovery cycle to 
test proper state restore of non-head operators.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? ( no)
     - If yes, how is the feature documented? (not applicable)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Using DataStreamUtils.reinterpretAsKeyedStream produces corrupted keyed state 
> after restore
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-10809
>                 URL: https://issues.apache.org/jira/browse/FLINK-10809
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API, State Backends, Checkpointing
>    Affects Versions: 1.7.0
>            Reporter: Dawid Wysakowicz
>            Priority: Major
>              Labels: pull-request-available
>
> I've tried using {{DataStreamUtils.reinterpretAsKeyedStream}} for results of 
> windowed aggregation:
> {code}
>               DataStream<Tuple2<Integer, List<Event>>> eventStream4 = 
> eventStream2.keyBy(Event::getKey)
>                       
> .window(SlidingEventTimeWindows.of(Time.milliseconds(150 * 3), 
> Time.milliseconds(150)))
>                       .apply(new WindowFunction<Event, Tuple2<Integer, 
> List<Event>>, Integer, TimeWindow>() {
>                               private static final long serialVersionUID = 
> 3166250579972849440L;
>                               @Override
>                               public void apply(
>                                       Integer key, TimeWindow window, 
> Iterable<Event> input,
>                                       Collector<Tuple2<Integer, List<Event>>> 
> out) throws Exception {
>                                       out.collect(Tuple2.of(key, 
> StreamSupport.stream(input.spliterator(), 
> false).collect(Collectors.toList())));
>                               }
>                       });
>               DataStreamUtils.reinterpretAsKeyedStream(eventStream4, events-> 
> events.f0)
>                       .flatMap(createSlidingWindowCheckMapper(pt))
>                       .addSink(new PrintSinkFunction<>());
> {code}
> and then in the createSlidingWindowCheckMapper I verify that each event 
> belongs to 3 consecutive windows, for which I keep contents of last window in 
> ValueState. In a non-failure setup this check runs fine, but it misses few 
> windows after restore at the beginning.
> {code}
> public class SlidingWindowCheckMapper extends 
> RichFlatMapFunction<Tuple2<Integer, List<Event>>, String> {
>       private static final long serialVersionUID = -744070793650644485L;
>       /** This value state tracks previously seen events with the number of 
> windows they appeared in. */
>       private transient ValueState<List<Tuple2<Event, Integer>>> 
> previousWindow;
>       private final int slideFactor;
>       SlidingWindowCheckMapper(int slideFactor) {
>               this.slideFactor = slideFactor;
>       }
>       @Override
>       public void open(Configuration parameters) throws Exception {
>               ValueStateDescriptor<List<Tuple2<Event, Integer>>> 
> previousWindowDescriptor =
>                       new ValueStateDescriptor<>("previousWindow",
>                               new ListTypeInfo<>(new 
> TupleTypeInfo<>(TypeInformation.of(Event.class), 
> BasicTypeInfo.INT_TYPE_INFO)));
>               previousWindow = 
> getRuntimeContext().getState(previousWindowDescriptor);
>       }
>       @Override
>       public void flatMap(Tuple2<Integer, List<Event>> value, 
> Collector<String> out) throws Exception {
>               List<Tuple2<Event, Integer>> previousWindowValues = 
> Optional.ofNullable(previousWindow.value()).orElseGet(
>                       Collections::emptyList);
>               List<Event> newValues = value.f1;
>               newValues.stream().reduce(new BinaryOperator<Event>() {
>                       @Override
>                       public Event apply(Event event, Event event2) {
>                               if (event2.getSequenceNumber() - 1 != 
> event.getSequenceNumber()) {
>                                       out.collect("Alert: events in window 
> out ouf order!");
>                               }
>                               return event2;
>                       }
>               });
>               List<Tuple2<Event, Integer>> newWindow = new ArrayList<>();
>               for (Tuple2<Event, Integer> windowValue : previousWindowValues) 
> {
>                       if (!newValues.contains(windowValue.f0)) {
>                               out.collect(String.format("Alert: event %s did 
> not belong to %d consecutive windows. Event seen so far %d times.Current 
> window: %s",
>                                       windowValue.f0,
>                                       slideFactor,
>                                       windowValue.f1,
>                                       value.f1));
>                       } else {
>                               newValues.remove(windowValue.f0);
>                               if (windowValue.f1 + 1 != slideFactor) {
>                                       newWindow.add(Tuple2.of(windowValue.f0, 
> windowValue.f1 + 1));
>                               }
>                       }
>               }
>               newValues.forEach(e -> newWindow.add(Tuple2.of(e, 1)));
>               previousWindow.update(newWindow);
>       }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to