[ https://issues.apache.org/jira/browse/FLINK-6244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16121593#comment-16121593 ]
ASF GitHub Bot commented on FLINK-6244: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4320#discussion_r132446221 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java --- @@ -213,14 +321,97 @@ * @return {@link DataStream} which contains the resulting elements from the pattern flat select * function. */ - public <R> SingleOutputStreamOperator<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction, TypeInformation<R> outTypeInfo) { - SingleOutputStreamOperator<Map<String, List<T>>> patternStream = - CEPOperatorUtils.createPatternStream(inputStream, pattern); - - return patternStream.flatMap( - new PatternFlatSelectMapper<>( - patternStream.getExecutionEnvironment().clean(patternFlatSelectFunction) - )).returns(outTypeInfo); + public <R> SingleOutputStreamOperator<R> flatSelect( + final PatternFlatSelectFunction<T, R> patternFlatSelectFunction, + final TypeInformation<R> outTypeInfo) { + return CEPOperatorUtils.createPatternStream( + inputStream, + pattern, + clean(patternFlatSelectFunction), + outTypeInfo); + } + + /** + * Applies a flat select function to the detected pattern sequence. For each pattern sequence the + * provided {@link PatternFlatSelectFunction} is called. The pattern select function can produce + * exactly one resulting element. + * + * <p>Applies a timeout function to a partial pattern sequence which has timed out. For each + * partial pattern sequence the provided {@link PatternFlatTimeoutFunction} is called. The pattern + * timeout function can produce exactly one resulting element. + * + * <p>You can get the stream of late data using + * {@link SingleOutputStreamOperator#getSideOutput(OutputTag)} on the + * {@link SingleOutputStreamOperator} resulting from the select operation + * with the same {@link OutputTag}. + * + * @param timeoutOutputTag {@link OutputTag} that identifies side output with timeouted patterns + * @param patternFlatTimeoutFunction The pattern timeout function which is called for each partial + * pattern sequence which has timed out. + * @param patternFlatSelectFunction The pattern select function which is called for each detected + * pattern sequence. + * @param <L> Type of the resulting timeout elements + * @param <R> Type of the resulting elements + * @return {@link DataStream} which contains the resulting elements with the resulting timeout + * elements in a side output. + */ + public <L, R> SingleOutputStreamOperator<R> flatSelect( + final OutputTag<L> timeoutOutputTag, + final PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction, + final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) { + + TypeInformation<R> rightTypeInfo = TypeExtractor.getUnaryOperatorReturnType( + patternFlatSelectFunction, + PatternFlatSelectFunction.class, + 0, + 1, + new int[]{0, 1, 0}, + new int[]{1, 0}, + inputStream.getType(), + null, + false); + + return flatSelect(timeoutOutputTag, patternFlatTimeoutFunction, rightTypeInfo, patternFlatSelectFunction); + } + + /** + * Applies a flat select function to the detected pattern sequence. For each pattern sequence the + * provided {@link PatternFlatSelectFunction} is called. The pattern select function can produce + * exactly one resulting element. + * + * <p>Applies a timeout function to a partial pattern sequence which has timed out. For each + * partial pattern sequence the provided {@link PatternFlatTimeoutFunction} is called. The pattern + * timeout function can produce exactly one resulting element. + * + * <p>You can get the stream of late data using --- End diff -- Same as above. > Emit timeouted Patterns as Side Output > -------------------------------------- > > Key: FLINK-6244 > URL: https://issues.apache.org/jira/browse/FLINK-6244 > Project: Flink > Issue Type: Improvement > Components: CEP > Affects Versions: 1.3.0 > Reporter: Dawid Wysakowicz > Assignee: Dawid Wysakowicz > Fix For: 1.4.0 > > > Now that we have SideOuputs I think timeouted patterns should be emitted into > them rather than producing a stream of `Either` -- This message was sent by Atlassian JIRA (v6.4.14#64029)