[ 
https://issues.apache.org/jira/browse/FLINK-7008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066202#comment-16066202
 ] 

ASF GitHub Bot commented on FLINK-7008:
---------------------------------------

Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4195#discussion_r124491313
  
    --- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java 
---
    @@ -324,6 +328,89 @@ public boolean filter(Event value) throws Exception {
                }
        }
     
    +   @Test
    +   public void testNFAChange() {
    +           Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    --- End diff --
    
    I feel the Pattern is a bit to complicated for this test. e.g. the 
notPattern does not introduce any new corner cases.
    
    I miss a test for situation where only one CS is generated. You can 
simulate it with:
    
        @Test
        public void testNFAChangedOnOneNewComputationState() {
                Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("start");
                        }
                }).followedBy("a*").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
1858562682635302605L;
    
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
                }).oneOrMore().optional().next("end").where(new 
IterativeCondition<Event>() {
                        private static final long serialVersionUID = 
8061969839441121955L;
    
                        @Override
                        public boolean filter(Event value, Context<Event> ctx) 
throws Exception {
                                return value.getName().equals("b");
                        }
                }).within(Time.milliseconds(10));
    
                NFACompiler.NFAFactory<Event> nfaFactory = 
NFACompiler.compileFactory(pattern, Event.createTypeSerializer(), true);
                NFA<Event> nfa = nfaFactory.createNFA();
    
                // both the queue of ComputationStatus and eventSharedBuffer 
have changed
                nfa.nfaChanged = false;
                nfa.process(new Event(6, "start", 1.0), 6L);
                nfa.nfaChanged = false;
                nfa.process(new Event(6, "a", 1.0), 7L);
                assertTrue("NFA status should change as the event matches the 
take condition of 'middle2' state", nfa.isNFAChanged());
        }


> Update NFA state only when the NFA changes.
> -------------------------------------------
>
>                 Key: FLINK-7008
>                 URL: https://issues.apache.org/jira/browse/FLINK-7008
>             Project: Flink
>          Issue Type: Improvement
>          Components: CEP
>    Affects Versions: 1.3.1
>            Reporter: Kostas Kloudas
>            Assignee: Dian Fu
>             Fix For: 1.4.0
>
>
> Currently in the {{AbstractKeyedCEPPatternOperator.updateNFA()}} method we 
> update the NFA state every time the NFA is touched. This leads to redundant 
> puts/gets to the state when there are no changes to the NFA itself.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to