[ 
https://issues.apache.org/jira/browse/FLINK-7147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16116444#comment-16116444
 ] 

ASF GitHub Bot commented on FLINK-7147:
---------------------------------------

Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4296#discussion_r131629581
  
    --- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java
 ---
    @@ -0,0 +1,1068 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.cep.Event;
    +import org.apache.flink.cep.nfa.compiler.NFACompiler;
    +import org.apache.flink.cep.pattern.Pattern;
    +import org.apache.flink.cep.pattern.conditions.SimpleCondition;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.util.TestLogger;
    +
    +import com.google.common.collect.Lists;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
    +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
    +
    +/**
    + * IT tests covering {@link Pattern#greedy()}.
    + */
    +public class GreedyITCase extends TestLogger {
    +
    +   @Test
    +   public void testGreedyZeroOrMore() {
    +           List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +           Event c = new Event(40, "c", 1.0);
    +           Event a1 = new Event(41, "a", 2.0);
    +           Event a2 = new Event(42, "a", 2.0);
    +           Event a3 = new Event(43, "a", 2.0);
    +           Event d = new Event(44, "d", 3.0);
    +
    +           inputEvents.add(new StreamRecord<>(c, 1));
    +           inputEvents.add(new StreamRecord<>(a1, 2));
    +           inputEvents.add(new StreamRecord<>(a2, 3));
    +           inputEvents.add(new StreamRecord<>(a3, 4));
    +           inputEvents.add(new StreamRecord<>(d, 5));
    +
    +           // c a* d
    +           Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("c");
    +                   }
    +           }).followedBy("middle").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("a");
    +                   }
    +           }).oneOrMore().optional().greedy().followedBy("end").where(new 
SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("d");
    +                   }
    +           });
    +
    +           NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
    +
    +           final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
    +
    +           compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +                   Lists.newArrayList(c, a1, a2, a3, d)
    +           ));
    +   }
    +
    +   @Test
    +   public void testGreedyZeroOrMoreConsecutiveEndWithOptional() {
    +           List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +           Event c = new Event(40, "c", 1.0);
    +           Event a1 = new Event(41, "a", 2.0);
    +           Event a2 = new Event(42, "a", 2.0);
    +           Event a3 = new Event(43, "a", 2.0);
    +           Event d = new Event(44, "d", 3.0);
    +
    +           inputEvents.add(new StreamRecord<>(c, 1));
    +           inputEvents.add(new StreamRecord<>(a1, 2));
    +           inputEvents.add(new StreamRecord<>(a2, 3));
    +           inputEvents.add(new StreamRecord<>(a3, 4));
    +           inputEvents.add(new StreamRecord<>(d, 5));
    +
    +           // c a* d
    +           Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("c");
    +                   }
    +           }).followedBy("middle").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("a");
    +                   }
    +           
}).oneOrMore().optional().consecutive().greedy().followedBy("end").where(new 
SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("d");
    +                   }
    +           }).optional();
    +
    +           NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
    +
    +           final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
    +
    +           compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +                   Lists.newArrayList(c),
    +                   Lists.newArrayList(c, a1, a2, a3, d)
    +           ));
    +   }
    +
    +   @Test
    +   public void testGreedyZeroOrMoreInBetween() {
    +           List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +           Event c = new Event(40, "c", 1.0);
    +           Event a1 = new Event(41, "a", 2.0);
    +           Event a2 = new Event(42, "a", 2.0);
    +           Event a3 = new Event(43, "a", 2.0);
    +           Event d = new Event(44, "d", 3.0);
    +
    +           inputEvents.add(new StreamRecord<>(c, 1));
    +           inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 
2));
    +           inputEvents.add(new StreamRecord<>(a1, 3));
    +           inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 
4));
    +           inputEvents.add(new StreamRecord<>(a2, 5));
    +           inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 
6));
    +           inputEvents.add(new StreamRecord<>(a3, 7));
    +           inputEvents.add(new StreamRecord<>(d, 8));
    +
    +           // c a* d
    +           Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("c");
    +                   }
    +           }).followedBy("middle").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("a");
    +                   }
    +           }).oneOrMore().optional().greedy().followedBy("end").where(new 
SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("d");
    +                   }
    +           });
    +
    +           NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
    +
    +           final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
    +
    +           compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +                   Lists.newArrayList(c, a1, a2, a3, d)
    +           ));
    +   }
    +
    +   @Test
    +   public void testGreedyZeroOrMoreWithDummyEventsAfterQuantifier() {
    +           List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +           Event c = new Event(40, "c", 1.0);
    +           Event a1 = new Event(41, "a", 2.0);
    +           Event a2 = new Event(42, "a", 2.0);
    +           Event d = new Event(44, "d", 3.0);
    +
    +           inputEvents.add(new StreamRecord<>(c, 1));
    +           inputEvents.add(new StreamRecord<>(a1, 2));
    +           inputEvents.add(new StreamRecord<>(a2, 3));
    +           inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 
4));
    +           inputEvents.add(new StreamRecord<>(d, 5));
    +
    +           // c a* d
    +           Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("c");
    +                   }
    +           }).followedBy("middle").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("a");
    +                   }
    +           }).oneOrMore().optional().greedy().followedBy("end").where(new 
SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("d");
    +                   }
    +           });
    +
    +           NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
    +
    +           final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
    +
    +           compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +                   Lists.newArrayList(c, a1, a2, d)
    +           ));
    +   }
    +
    +   @Test
    +   public void testGreedyZeroOrMoreWithDummyEventsBeforeQuantifier() {
    +           List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +           Event c = new Event(40, "c", 1.0);
    +           Event d = new Event(44, "d", 3.0);
    +
    +           inputEvents.add(new StreamRecord<>(c, 1));
    +           inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 
2));
    +           inputEvents.add(new StreamRecord<>(d, 5));
    +
    +           // c a* d
    +           Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("c");
    +                   }
    +           }).followedBy("middle").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("a");
    +                   }
    +           }).oneOrMore().optional().greedy().followedBy("end").where(new 
SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("d");
    +                   }
    +           });
    +
    +           NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
    +
    +           final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
    +
    +           compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +                   Lists.newArrayList(c, d)
    +           ));
    +   }
    +
    +   @Test
    +   public void testGreedyZeroOrMoreBeforeOptional() {
    +           List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +           Event c = new Event(40, "c", 1.0);
    +           Event a1 = new Event(41, "a", 2.0);
    +           Event a2 = new Event(42, "a", 2.0);
    +           Event e = new Event(44, "e", 3.0);
    +
    +           inputEvents.add(new StreamRecord<>(c, 1));
    +           inputEvents.add(new StreamRecord<>(a1, 2));
    +           inputEvents.add(new StreamRecord<>(a2, 3));
    +           inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 
4));
    +           inputEvents.add(new StreamRecord<>(e, 5));
    +
    +           // c a* d e
    +           Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("c");
    +                   }
    +           }).followedBy("middle1").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("a");
    +                   }
    +           
}).oneOrMore().optional().greedy().followedBy("middle2").where(new 
SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("d");
    +                   }
    +           }).optional().followedBy("end").where(new 
SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("e");
    +                   }
    +           });
    +
    +           NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
    +
    +           final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
    +
    +           compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +                   Lists.newArrayList(c, a1, a2, e)
    +           ));
    +   }
    +
    +   @Test
    +   public void testGreedyZeroOrMoreBeforeOptional2() {
    +           List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +           Event c = new Event(40, "c", 1.0);
    +           Event a1 = new Event(41, "a", 2.0);
    +           Event a2 = new Event(42, "a", 2.0);
    +           Event d = new Event(43, "d", 3.0);
    +           Event e = new Event(44, "e", 3.0);
    +
    +           inputEvents.add(new StreamRecord<>(c, 1));
    +           inputEvents.add(new StreamRecord<>(a1, 2));
    +           inputEvents.add(new StreamRecord<>(a2, 3));
    +           inputEvents.add(new StreamRecord<>(d, 4));
    +           inputEvents.add(new StreamRecord<>(e, 5));
    +
    +           // c a* d e
    +           Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("c");
    +                   }
    +           }).followedBy("middle1").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("a");
    +                   }
    +           
}).oneOrMore().optional().greedy().followedBy("middle2").where(new 
SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("d");
    +                   }
    +           }).optional().followedBy("end").where(new 
SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("e");
    +                   }
    +           });
    +
    +           NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
    +
    +           final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
    +
    +           compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +                   Lists.newArrayList(c, a1, a2, e),
    +                   Lists.newArrayList(c, a1, a2, d, e)
    +           ));
    +   }
    +
    +   @Test
    +   public void testGreedyUtilZeroOrMoreWithDummyEventsAfterQuantifier() {
    +           List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +           Event c = new Event(40, "c", 1.0);
    +           Event a1 = new Event(41, "a", 2.0);
    +           Event a2 = new Event(42, "a", 3.0);
    +           Event a3 = new Event(43, "a", 3.0);
    +           Event d = new Event(45, "d", 3.0);
    +
    +           inputEvents.add(new StreamRecord<>(c, 1));
    +           inputEvents.add(new StreamRecord<>(a1, 2));
    +           inputEvents.add(new StreamRecord<>(a2, 3));
    +           inputEvents.add(new StreamRecord<>(a3, 4));
    +           inputEvents.add(new StreamRecord<>(new Event(44, "a", 4.0), 5));
    +           inputEvents.add(new StreamRecord<>(d, 6));
    +
    +           // c a* d
    +           Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("c");
    +                   }
    +           }).followedBy("middle").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("a");
    +                   }
    +           }).oneOrMore().optional().greedy().until(new 
SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getPrice() > 3.0;
    +                   }
    +           }).followedBy("end").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("d");
    +                   }
    +           });
    +
    +           NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
    +
    +           final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
    +
    +           compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +                   Lists.newArrayList(c, a1, a2, a3, d)
    +           ));
    +   }
    +
    +   @Test
    +   public void testGreedyUtilWithDummyEventsBeforeQuantifier() {
    +           List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +           Event c = new Event(40, "c", 1.0);
    +           Event a1 = new Event(41, "a", 2.0);
    +           Event a2 = new Event(42, "a", 3.0);
    +           Event a3 = new Event(43, "a", 3.0);
    +           Event d = new Event(45, "d", 3.0);
    +
    +           inputEvents.add(new StreamRecord<>(c, 1));
    +           inputEvents.add(new StreamRecord<>(new Event(44, "a", 4.0), 2));
    +           inputEvents.add(new StreamRecord<>(a1, 3));
    +           inputEvents.add(new StreamRecord<>(a2, 4));
    +           inputEvents.add(new StreamRecord<>(a3, 5));
    +           inputEvents.add(new StreamRecord<>(d, 6));
    +
    +           // c a* d
    +           Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("c");
    +                   }
    +           }).followedBy("middle").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("a");
    +                   }
    +           }).oneOrMore().optional().greedy().until(new 
SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getPrice() > 3.0;
    +                   }
    +           }).followedBy("end").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("d");
    +                   }
    +           });
    +
    +           NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
    +
    +           final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
    +
    +           compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +                   Lists.newArrayList(c, d)
    +           ));
    +   }
    +
    +   @Test
    +   public void testGreedyOneOrMore() {
    +           List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +           Event c = new Event(40, "c", 1.0);
    +           Event a1 = new Event(41, "a", 2.0);
    +           Event a2 = new Event(42, "a", 2.0);
    +           Event a3 = new Event(43, "a", 2.0);
    +           Event d = new Event(44, "d", 3.0);
    +
    +           inputEvents.add(new StreamRecord<>(c, 1));
    +           inputEvents.add(new StreamRecord<>(a1, 2));
    +           inputEvents.add(new StreamRecord<>(a2, 3));
    +           inputEvents.add(new StreamRecord<>(a3, 4));
    +           inputEvents.add(new StreamRecord<>(d, 5));
    +
    +           // c a+ d
    +           Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("c");
    +                   }
    +           }).followedBy("middle").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("a");
    +                   }
    +           }).oneOrMore().greedy().followedBy("end").where(new 
SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("d");
    +                   }
    +           });
    +
    +           NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
    +
    +           final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
    +
    +           compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +                   Lists.newArrayList(c, a1, a2, a3, d)
    +           ));
    +   }
    +
    +   @Test
    +   public void testGreedyOneOrMoreInBetween() {
    +           List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +           Event c = new Event(40, "c", 1.0);
    +           Event a1 = new Event(41, "a", 2.0);
    +           Event a2 = new Event(42, "a", 2.0);
    +           Event a3 = new Event(43, "a", 2.0);
    +           Event d = new Event(44, "d", 3.0);
    +
    +           inputEvents.add(new StreamRecord<>(c, 1));
    +           inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 
2));
    +           inputEvents.add(new StreamRecord<>(a1, 3));
    +           inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 
4));
    +           inputEvents.add(new StreamRecord<>(a2, 5));
    +           inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 
6));
    +           inputEvents.add(new StreamRecord<>(a3, 7));
    +           inputEvents.add(new StreamRecord<>(d, 8));
    +
    +           // c a+ d
    +           Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("c");
    +                   }
    +           }).followedBy("middle").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("a");
    +                   }
    +           }).oneOrMore().greedy().followedBy("end").where(new 
SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("d");
    +                   }
    +           });
    +
    +           NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
    +
    +           final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
    +
    +           compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +                   Lists.newArrayList(c, a1, a2, a3, d)
    +           ));
    +   }
    +
    +   @Test
    +   public void testGreedyOneOrMoreWithDummyEventsAfterQuantifier() {
    +           List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +           Event c = new Event(40, "c", 1.0);
    +           Event a1 = new Event(41, "a", 2.0);
    +           Event a2 = new Event(42, "a", 2.0);
    +           Event d = new Event(44, "d", 3.0);
    +
    +           inputEvents.add(new StreamRecord<>(c, 1));
    +           inputEvents.add(new StreamRecord<>(a1, 2));
    +           inputEvents.add(new StreamRecord<>(a2, 3));
    +           inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 
4));
    +           inputEvents.add(new StreamRecord<>(d, 5));
    +
    +           // c a+ d
    +           Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("c");
    +                   }
    +           }).followedBy("middle").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("a");
    +                   }
    +           }).oneOrMore().greedy().followedBy("end").where(new 
SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("d");
    +                   }
    +           });
    +
    +           NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
    +
    +           final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
    +
    +           compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +                   Lists.newArrayList(c, a1, a2, d)
    +           ));
    +   }
    +
    +   @Test
    +   public void testGreedyOneOrMoreWithDummyEventsBeforeQuantifier() {
    +           List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +           Event c = new Event(40, "c", 1.0);
    +           Event d = new Event(44, "d", 3.0);
    +
    +           inputEvents.add(new StreamRecord<>(c, 1));
    +           inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 
2));
    +           inputEvents.add(new StreamRecord<>(d, 5));
    +
    +           // c a+ d
    +           Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("c");
    +                   }
    +           }).followedBy("middle").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("a");
    +                   }
    +           }).oneOrMore().greedy().followedBy("end").where(new 
SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("d");
    +                   }
    +           });
    +
    +           NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
    +
    +           final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
    +
    +           compareMaps(resultingPatterns, 
Lists.<List<Event>>newArrayList());
    +   }
    +
    +   @Test
    +   public void testGreedyUtilOneOrMoreWithDummyEventsAfterQuantifier() {
    +           List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +           Event c = new Event(40, "c", 1.0);
    +           Event a1 = new Event(41, "a", 2.0);
    +           Event a2 = new Event(42, "a", 3.0);
    +           Event a3 = new Event(43, "a", 3.0);
    +           Event d = new Event(45, "d", 3.0);
    +
    +           inputEvents.add(new StreamRecord<>(c, 1));
    +           inputEvents.add(new StreamRecord<>(a1, 2));
    +           inputEvents.add(new StreamRecord<>(a2, 3));
    +           inputEvents.add(new StreamRecord<>(a3, 4));
    +           inputEvents.add(new StreamRecord<>(new Event(44, "a", 4.0), 5));
    +           inputEvents.add(new StreamRecord<>(d, 6));
    +
    +           // c a+ d
    +           Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("c");
    +                   }
    +           }).followedBy("middle").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("a");
    +                   }
    +           }).oneOrMore().greedy().until(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getPrice() > 3.0;
    +                   }
    +           }).followedBy("end").where(new SimpleCondition<Event>() {
    +                   private static final long serialVersionUID = 
5726188262756267490L;
    +
    +                   @Override
    +                   public boolean filter(Event value) throws Exception {
    +                           return value.getName().equals("d");
    +                   }
    +           });
    +
    +           NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
    +
    +           final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
    +
    +           compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +                   Lists.newArrayList(c, a1, a2, a3, d)
    +           ));
    +   }
    +
    +   @Test
    +   public void testGreedyUtilOneOrMoreWithDummyEventsBeforeQuantifier() {
    --- End diff --
    
    misspell: `testGreedyUtilOneOrMoreWithDummyEventsBeforeQuantifier` -> 
`testGreedyUntilOneOrMoreWithDummyEventsBeforeQuantifier`


> Support greedy quantifier in CEP
> --------------------------------
>
>                 Key: FLINK-7147
>                 URL: https://issues.apache.org/jira/browse/FLINK-7147
>             Project: Flink
>          Issue Type: Sub-task
>          Components: CEP, Table API & SQL
>            Reporter: Dian Fu
>            Assignee: Dian Fu
>
> Greedy quantifier will try to match the token as many times as possible. For 
> example, for pattern {{a b* c}} (skip till next is used) and inputs {{a b1 b2 
> c}}, if the quantifier for {{b}} is greedy, it will only output {{a b1 b2 c}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to