[ 
https://issues.apache.org/jira/browse/FLINK-6244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16121593#comment-16121593
 ] 

ASF GitHub Bot commented on FLINK-6244:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4320#discussion_r132446221
  
    --- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java 
---
    @@ -213,14 +321,97 @@
         * @return {@link DataStream} which contains the resulting elements 
from the pattern flat select
         *         function.
         */
    -   public <R> SingleOutputStreamOperator<R> flatSelect(final 
PatternFlatSelectFunction<T, R> patternFlatSelectFunction, TypeInformation<R> 
outTypeInfo) {
    -           SingleOutputStreamOperator<Map<String, List<T>>> patternStream =
    -                           
CEPOperatorUtils.createPatternStream(inputStream, pattern);
    -
    -           return patternStream.flatMap(
    -                   new PatternFlatSelectMapper<>(
    -                           
patternStream.getExecutionEnvironment().clean(patternFlatSelectFunction)
    -                   )).returns(outTypeInfo);
    +   public <R> SingleOutputStreamOperator<R> flatSelect(
    +                   final PatternFlatSelectFunction<T, R> 
patternFlatSelectFunction,
    +                   final TypeInformation<R> outTypeInfo) {
    +           return CEPOperatorUtils.createPatternStream(
    +                   inputStream,
    +                   pattern,
    +                   clean(patternFlatSelectFunction),
    +                   outTypeInfo);
    +   }
    +
    +   /**
    +    * Applies a flat select function to the detected pattern sequence. For 
each pattern sequence the
    +    * provided {@link PatternFlatSelectFunction} is called. The pattern 
select function can produce
    +    * exactly one resulting element.
    +    *
    +    * <p>Applies a timeout function to a partial pattern sequence which 
has timed out. For each
    +    * partial pattern sequence the provided {@link 
PatternFlatTimeoutFunction} is called. The pattern
    +    * timeout function can produce exactly one resulting element.
    +    *
    +    * <p>You can get the stream of late data using
    +    * {@link SingleOutputStreamOperator#getSideOutput(OutputTag)} on the
    +    * {@link SingleOutputStreamOperator} resulting from the select 
operation
    +    * with the same {@link OutputTag}.
    +    *
    +    * @param timeoutOutputTag {@link OutputTag} that identifies side 
output with timeouted patterns
    +    * @param patternFlatTimeoutFunction The pattern timeout function which 
is called for each partial
    +    *                               pattern sequence which has timed out.
    +    * @param patternFlatSelectFunction The pattern select function which 
is called for each detected
    +    *                              pattern sequence.
    +    * @param <L> Type of the resulting timeout elements
    +    * @param <R> Type of the resulting elements
    +    * @return {@link DataStream} which contains the resulting elements 
with the resulting timeout
    +    * elements in a side output.
    +    */
    +   public <L, R> SingleOutputStreamOperator<R> flatSelect(
    +           final OutputTag<L> timeoutOutputTag,
    +           final PatternFlatTimeoutFunction<T, L> 
patternFlatTimeoutFunction,
    +           final PatternFlatSelectFunction<T, R> 
patternFlatSelectFunction) {
    +
    +           TypeInformation<R> rightTypeInfo = 
TypeExtractor.getUnaryOperatorReturnType(
    +                   patternFlatSelectFunction,
    +                   PatternFlatSelectFunction.class,
    +                   0,
    +                   1,
    +                   new int[]{0, 1, 0},
    +                   new int[]{1, 0},
    +                   inputStream.getType(),
    +                   null,
    +                   false);
    +
    +           return flatSelect(timeoutOutputTag, patternFlatTimeoutFunction, 
rightTypeInfo, patternFlatSelectFunction);
    +   }
    +
    +   /**
    +    * Applies a flat select function to the detected pattern sequence. For 
each pattern sequence the
    +    * provided {@link PatternFlatSelectFunction} is called. The pattern 
select function can produce
    +    * exactly one resulting element.
    +    *
    +    * <p>Applies a timeout function to a partial pattern sequence which 
has timed out. For each
    +    * partial pattern sequence the provided {@link 
PatternFlatTimeoutFunction} is called. The pattern
    +    * timeout function can produce exactly one resulting element.
    +    *
    +    * <p>You can get the stream of late data using
    --- End diff --
    
    Same as above.


> Emit timeouted Patterns as Side Output
> --------------------------------------
>
>                 Key: FLINK-6244
>                 URL: https://issues.apache.org/jira/browse/FLINK-6244
>             Project: Flink
>          Issue Type: Improvement
>          Components: CEP
>    Affects Versions: 1.3.0
>            Reporter: Dawid Wysakowicz
>            Assignee: Dawid Wysakowicz
>             Fix For: 1.4.0
>
>
> Now that we have SideOuputs I think timeouted patterns should be emitted into 
> them rather than producing a stream of `Either`



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to