[ 
https://issues.apache.org/jira/browse/FLINK-5845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15878219#comment-15878219
 ] 

ASF GitHub Bot commented on FLINK-5845:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3375#discussion_r102461630
  
    --- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 ---
    @@ -100,27 +119,21 @@ public void open() throws Exception {
     
                if (nfaOperatorState == null) {
                        nfaOperatorState = getPartitionedState(
    -                                   new ValueStateDescriptor<NFA<IN>>(
    -                                           NFA_OPERATOR_STATE_NAME,
    -                                           new NFA.Serializer<IN>()));
    +                           new 
ValueStateDescriptor<>(NFA_OPERATOR_STATE_NAME, new NFA.Serializer<IN>()));
                }
     
                @SuppressWarnings("unchecked,rawtypes")
                TypeSerializer<StreamRecord<IN>> streamRecordSerializer =
    -                           (TypeSerializer) new 
StreamElementSerializer<>(getInputSerializer());
    +                   (TypeSerializer) new 
StreamElementSerializer<>(getInputSerializer());
     
                if (priorityQueueOperatorState == null) {
                        priorityQueueOperatorState = getPartitionedState(
    -                                   new ValueStateDescriptor<>(
    -                                           PRIORIRY_QUEUE_STATE_NAME,
    -                                           new PriorityQueueSerializer<>(
    -                                                           
streamRecordSerializer,
    -                                                           new 
PriorityQueueStreamRecordFactory<IN>())));
    +                           new 
ValueStateDescriptor<>(PRIORITY_QUEUE_STATE_NAME,
    +                                   new 
PriorityQueueSerializer<>(streamRecordSerializer, new 
PriorityQueueStreamRecordFactory<IN>())));
    --- End diff --
    
    If doing reformatting changes then please try to not change a consistent 
style. When breaking long parameter lists, imo, every parameter should be on a 
separate line and indented identically. This is not the case with 
`PRIORITY_QUEUE_STATE_NAME`.


> CEP: unify key and non-keyed operators
> --------------------------------------
>
>                 Key: FLINK-5845
>                 URL: https://issues.apache.org/jira/browse/FLINK-5845
>             Project: Flink
>          Issue Type: Sub-task
>          Components: CEP
>    Affects Versions: 1.3.0
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>             Fix For: 1.3.0
>
>
> Currently the keyed and non-keyed operators in the CEP library have different 
> implementations. This issue targets to unify them into one. 
> This new implementation will always be applied on a keyed stream, and in the 
> case of non-keyed usecases, the input stream will be keyed on a dummy key, as 
> done in the case of the {{DataStream.windowAll()}} method, where the input 
> stream is keyed using the {{NullByteKeySelector}}.
> This is a first step towards making the CEP operators rescalable.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to