[ 
https://issues.apache.org/jira/browse/FLINK-5845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15878218#comment-15878218
 ] 

ASF GitHub Bot commented on FLINK-5845:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3375#discussion_r102462053
  
    --- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
 ---
    @@ -127,16 +134,36 @@
                                        keySerializer,
                                        nfaFactory));
                } else {
    -                   patternStream = inputStream.transform(
    +
    +                   KeySelector<T, Byte> keySelector = new 
NullByteKeySelector<>();
    +                   TypeSerializer<Byte> keySerializer = 
ByteSerializer.INSTANCE;
    +
    +                   patternStream = inputStream.keyBy(new 
NullByteKeySelector<T>()).transform(
                                "TimeoutCEPPatternOperator",
                                eitherTypeInformation,
    -                           new TimeoutCEPPatternOperator<>(
    +                           new TimeoutKeyedCEPPatternOperator<>(
                                        inputSerializer,
                                        isProcessingTime,
    +                                   keySelector,
    +                                   keySerializer,
                                        nfaFactory
                                )).forceNonParallel();
                }
     
                return patternStream;
        }
    +
    +   /**
    +    * Used as dummy KeySelector to allow using WindowOperator for 
Non-Keyed Windows.
    --- End diff --
    
    Does not seem to fit here. Copy & paste artifact?


> CEP: unify key and non-keyed operators
> --------------------------------------
>
>                 Key: FLINK-5845
>                 URL: https://issues.apache.org/jira/browse/FLINK-5845
>             Project: Flink
>          Issue Type: Sub-task
>          Components: CEP
>    Affects Versions: 1.3.0
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>             Fix For: 1.3.0
>
>
> Currently the keyed and non-keyed operators in the CEP library have different 
> implementations. This issue targets to unify them into one. 
> This new implementation will always be applied on a keyed stream, and in the 
> case of non-keyed usecases, the input stream will be keyed on a dummy key, as 
> done in the case of the {{DataStream.windowAll()}} method, where the input 
> stream is keyed using the {{NullByteKeySelector}}.
> This is a first step towards making the CEP operators rescalable.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to