[ 
https://issues.apache.org/jira/browse/FLINK-6904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16049169#comment-16049169
 ] 

ASF GitHub Bot commented on FLINK-6904:
---------------------------------------

Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4121#discussion_r121943356
  
    --- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java 
---
    @@ -1905,6 +2147,275 @@ public boolean filter(Event value) throws Exception 
{
                ));
        }
     
    +   @Test
    +   public void testTimesRangeNonStrictOptional1() {
    +           List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +           inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 
1));
    +           inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
    +           inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
    +
    +           Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("c");
    +                   }
    +           }).followedBy("middle").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("a");
    +                   }
    +           }).times(1, 3).optional().followedBy("end1").where(new 
SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("b");
    +                   }
    +           });
    +
    +           NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
    +
    +           List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +           compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +                   Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.end)
    +           ));
    +   }
    +
    +   @Test
    +   public void testTimesRangeNonStrictOptional2() {
    +           List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +           inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 
1));
    +           inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
    +           inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent1, 3));
    +           inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
    +           inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent2, 5));
    +           inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent3, 6));
    +           inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
    +
    +           Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("c");
    +                   }
    +           }).followedByAny("middle").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("a");
    +                   }
    +           }).times(2, 
3).allowCombinations().optional().followedBy("end1").where(new 
SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("b");
    +                   }
    +           });
    +
    +           NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
    +
    +           List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +           compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +                   Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.middleEvent3, ConsecutiveData.end),
    +                   Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.end),
    +                   Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, 
ConsecutiveData.end),
    +                   Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, 
ConsecutiveData.end),
    +                   Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.end)
    +           ));
    +   }
    +
    +   @Test
    +   public void testTimesRangeNonStrictOptional3() {
    +           List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +           inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 
1));
    +           inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
    +           inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent1, 3));
    +           inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
    +           inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent2, 5));
    +           inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent3, 6));
    +           inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
    +
    +           Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("c");
    +                   }
    +           }).followedByAny("middle").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("a");
    +                   }
    +           }).times(2, 3).optional().followedBy("end1").where(new 
SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("b");
    +                   }
    +           });
    +
    +           NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
    +
    +           List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +           compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +                   Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.middleEvent3, ConsecutiveData.end),
    +                   Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.end),
    +                   Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, 
ConsecutiveData.end),
    +                   Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.end)
    +           ));
    +   }
    +
    +   @Test
    +   public void testTimesRangeNonStrictWithNext() {
    +           List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +           inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 
1));
    +           inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent1, 2));
    +           inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 3));
    +           inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent2, 4));
    +           inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 5));
    +           inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent3, 6));
    +           inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
    +
    +           Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("c");
    +                   }
    +           }).next("middle").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("a");
    +                   }
    +           }).times(2, 3).allowCombinations().followedBy("end1").where(new 
SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("b");
    +                   }
    +           });
    +
    +           NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
    +
    +           List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +           compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +                   Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.middleEvent3, ConsecutiveData.end),
    +                   Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.end),
    +                   Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end)
    +           ));
    +   }
    +
    +   @Test
    +   public void testTimesRangeNotStrictWithFollowedByEager() {
    --- End diff --
    
    Maybe change the name to `testTimesRangeNotStrictWithFollowedBy`. I think 
eager may be misleading here. What do you think?


> Support for quantifier range to CEP's pattern API
> -------------------------------------------------
>
>                 Key: FLINK-6904
>                 URL: https://issues.apache.org/jira/browse/FLINK-6904
>             Project: Flink
>          Issue Type: Bug
>          Components: CEP
>            Reporter: Dian Fu
>            Assignee: Dian Fu
>
> Currently the quantifier has supported oneOrMore, times(int times), one(),we 
> should also support API such as times(int from, int to) to specify a 
> quantifier range.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to