[ https://issues.apache.org/jira/browse/FLINK-13148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16900582#comment-16900582 ]
Congxian Qiu(klion26) commented on FLINK-13148: ----------------------------------------------- [~JaryZhen] thanks for the reminder, we need to have an agreement with [~kkl0u] on this before implementation. Could you please have a look at this please [~kkl0u] > Expose WindowedStream.sideOutputLateData() from CoGroupedStreams > ---------------------------------------------------------------- > > Key: FLINK-13148 > URL: https://issues.apache.org/jira/browse/FLINK-13148 > Project: Flink > Issue Type: Improvement > Components: API / DataStream > Reporter: Congxian Qiu(klion26) > Assignee: Congxian Qiu(klion26) > Priority: Major > > As FLINK-10050 supported {{alloedLateness}}, but we can not get the side > output containing the late data, this issue wants to fix it. > For implementation, I want to add an input parameter {{OutputTag}} in > {{WithWindow}} as following > {code:java} > protected WithWindow(DataStream<T1> input1, > DataStream<T2> input2, > KeySelector<T1, KEY> keySelector1, > KeySelector<T2, KEY> keySelector2, > TypeInformation<KEY> keyType, > WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner, > Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger, > Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor, > Time allowedLateness, > OutputTage<TaggedUnion<T1, T2>> outputTag) { > ... > } > {code} > and add a function sideOutputLateData(OutputTag<T> outputTag) in > {{WithWindow}} > {code:java} > public WithWindow<T1, T2, KEY, W> > sideOutputLateData(OutputTag<TaggedUnion<T1, T2>> outputTag) { > ... > } > {code} > In {{WithWindow#apply}} will add outputTag if it is not null > {code:java} > public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, > TypeInfomation<T> resultType) { > ... > if (outputTag != null) { > windowedStream.sideOutputLateData(outputTag); > } > ... > }{code} > The same will apply to {{JoinedStreams}} -- This message was sent by Atlassian JIRA (v7.6.14#76016)