[FLINK-6165] [cep] Implement internal continuity for looping states.

Allows looping states (oneOrMore, zeroOrMore, times) to specify
if they want their elements to be consecutive or allow non-matching
elements in-between.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aa3c395b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aa3c395b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aa3c395b

Branch: refs/heads/table-retraction
Commit: aa3c395b97943e312dd16964b363f0e4f86c6739
Parents: d4665a0
Author: Dawid Wysakowicz <[email protected]>
Authored: Mon Mar 27 15:05:11 2017 +0200
Committer: kl0u <[email protected]>
Committed: Thu Mar 30 10:24:19 2017 +0200

----------------------------------------------------------------------
 docs/dev/libs/cep.md                            |  66 ++++
 .../flink/cep/scala/pattern/Pattern.scala       |  29 ++
 .../java/org/apache/flink/cep/nfa/State.java    |   6 +-
 .../flink/cep/nfa/compiler/NFACompiler.java     | 189 +++++-----
 .../org/apache/flink/cep/pattern/Pattern.java   |  71 ++++
 .../apache/flink/cep/pattern/Quantifier.java    |  18 +-
 .../org/apache/flink/cep/nfa/NFAITCase.java     | 374 +++++++++++++++----
 7 files changed, 603 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/aa3c395b/docs/dev/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index 932ba30..bb704c7 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -396,6 +396,7 @@ patternState.within(Time.seconds(10));
           <td>
               <p>Specifies that this pattern can occur zero or more 
times(kleene star). This means any number of events can be matched in this 
state.</p>
               <p>If eagerness is enabled(by default) for a pattern A*B and 
sequence A1 A2 B will generate patterns: B, A1 B and A1 A2 B. If disabled B, A1 
B, A2 B and A1 A2 B.</p>
+              <p>By default a relaxed internal continuity (between subsequent 
events of a loop) is used. For more info on the internal continuity see <a 
href="#consecutive_java">consecutive</a></p>
       {% highlight java %}
       patternState.zeroOrMore();
       {% endhighlight %}
@@ -406,6 +407,7 @@ patternState.within(Time.seconds(10));
           <td>
               <p>Specifies that this pattern can occur one or more 
times(kleene star). This means at least one and at most infinite number of 
events can be matched in this state.</p>
               <p>If eagerness is enabled (by default) for a pattern A*B and 
sequence A1 A2 B will generate patterns: A1 B and A1 A2 B. If disabled A1 B, A2 
B and A1 A2 B.</p>
+              <p>By default a relaxed internal continuity (between subsequent 
events of a loop) is used. For more info on the internal continuity see <a 
href="#consecutive_java">consecutive</a></p>
       {% highlight java %}
       patternState.oneOrMore();
       {% endhighlight %}
@@ -424,11 +426,50 @@ patternState.within(Time.seconds(10));
           <td><strong>Times</strong></td>
           <td>
               <p>Specifies exact number of times that this pattern should be 
matched.</p>
+              <p>By default a relaxed internal continuity (between subsequent 
events of a loop) is used. For more info on the internal continuity see <a 
href="#consecutive_java">consecutive</a></p>
       {% highlight java %}
       patternState.times(2);
       {% endhighlight %}
           </td>
        </tr>
+       <tr>
+          <td><strong>Consecutive</strong><a name="consecutive_java"></a></td>
+          <td>
+              <p>Works in conjunction with zeroOrMore, oneOrMore or times. 
Specifies that any not matching element breaks the loop.</p>
+              
+              <p>If not applied a relaxed continuity (as in followedBy) is 
used.</p>
+
+          <p>E.g. a pattern like:</p>
+      {% highlight java %}
+      Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+           @Override
+           public boolean filter(Event value) throws Exception {
+               return value.getName().equals("c");
+           }
+      })
+      .followedBy("middle").where(new SimpleCondition<Event>() {
+           @Override
+           public boolean filter(Event value) throws Exception {
+               return value.getName().equals("a");
+           }
+      })
+      .oneOrMore(true).consecutive()
+      .followedBy("end1").where(new SimpleCondition<Event>() {
+           @Override
+           public boolean filter(Event value) throws Exception {
+               return value.getName().equals("b");
+           }
+      });
+      {% endhighlight %}
+
+             <p>Will generate the following matches for a sequence: C D A1 A2 
A3 D A4 B</p>
+
+             <p>with consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 
B}</p>
+             <p>without consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 
A3 B}, {C A1 A2 A3 A4 B}</p>
+
+             <p><b>NOTICE:</b> This option can be applied only to 
zeroOrMore(), oneOrMore() and times()!</p>
+          </td>
+       </tr>
   </tbody>
 </table>
 </div>
@@ -511,6 +552,7 @@ patternState.within(Time.seconds(10))
           <td>
               <p>Specifies that this pattern can occur zero or more 
times(kleene star). This means any number of events can be matched in this 
state.</p>
               <p>If eagerness is enabled(by default) for a pattern A*B and 
sequence A1 A2 B will generate patterns: B, A1 B and A1 A2 B. If disabled B, A1 
B, A2 B and A1 A2 B.</p>
+              <p>By default a relaxed internal continuity (between subsequent 
events of a loop) is used. For more info on the internal continuity see <a 
href="#consecutive_scala">consecutive</a></p>
       {% highlight scala %}
       patternState.zeroOrMore()
       {% endhighlight %}
@@ -521,6 +563,7 @@ patternState.within(Time.seconds(10))
           <td>
               <p>Specifies that this pattern can occur one or more 
times(kleene star). This means at least one and at most infinite number of 
events can be matched in this state.</p>
               <p>If eagerness is enabled (by default) for a pattern A*B and 
sequence A1 A2 B will generate patterns: A1 B and A1 A2 B. If disabled A1 B, A2 
B and A1 A2 B.</p>
+              <p>By default a relaxed internal continuity (between subsequent 
events of a loop) is used. For more info on the internal continuity see <a 
href="#consecutive_scala">consecutive</a></p>
       {% highlight scala %}
       patternState.oneOrMore()
       {% endhighlight %}
@@ -539,11 +582,34 @@ patternState.within(Time.seconds(10))
           <td><strong>Times</strong></td>
           <td>
               <p>Specifies exact number of times that this pattern should be 
matched.</p>
+              <p>By default a relaxed internal continuity (between subsequent 
events of a loop) is used. For more info on the internal continuity see <a 
href="#consecutive_scala">consecutive</a></p>
       {% highlight scala %}
       patternState.times(2)
       {% endhighlight %}
           </td>
        </tr>
+       <tr>
+          <td><strong>Consecutive</strong><a name="consecutive_scala"></a></td>
+          <td>
+            <p>Works in conjunction with zeroOrMore, oneOrMore or times. 
Specifies that any not matching element breaks the loop.</p>
+            
+            <p>If not applied a relaxed continuity (as in followedBy) is 
used.</p>
+            
+      {% highlight scala %}
+      Pattern.begin("start").where(_.getName().equals("c"))
+       .followedBy("middle").where(_.getName().equals("a"))
+                            .oneOrMore(true).consecutive()
+       .followedBy("end1").where(_.getName().equals("b"));
+      {% endhighlight %}
+
+            <p>Will generate the following matches for a sequence: C D A1 A2 
A3 D A4 B</p>
+
+            <p>with consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 
B}</p>
+            <p>without consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 
B}, {C A1 A2 A3 A4 B}</p>
+
+            <p><b>NOTICE:</b> This option can be applied only to zeroOrMore(), 
oneOrMore() and times()!</p>
+          </td>
+       </tr>
   </tbody>
 </table>
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/aa3c395b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
 
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
index 07dfc5a..c636029 100644
--- 
a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
+++ 
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
@@ -270,6 +270,35 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
     this
   }
 
+
+  /**
+    * Works in conjunction with 
[[org.apache.flink.cep.scala.pattern.Pattern#zeroOrMore()]],
+    * [[org.apache.flink.cep.scala.pattern.Pattern#oneOrMore()]] or
+    * [[org.apache.flink.cep.scala.pattern.Pattern#times(int)]].
+    * Specifies that any not matching element breaks the loop.
+    *
+    * <p>E.g. a pattern like:
+    * {{{
+    * Pattern.begin("start").where(_.getName().equals("c"))
+    *        
.followedBy("middle").where(_.getName().equals("a")).oneOrMore(true).consecutive()
+    *        .followedBy("end1").where(_.getName().equals("b"));
+    * }}}
+    *
+    * <p>for a sequence: C D A1 A2 A3 D A4 B
+    *
+    * <p>will generate matches: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}
+    *
+    * <p><b>NOTICE:</b> This operator can be applied only when either 
zeroOrMore,
+    * oneOrMore or times was previously applied!
+    *
+    * <p>By default a relaxed continuity is applied.
+    * @return pattern with continuity changed to strict
+    */
+  def consecutive(): Pattern[T, F] = {
+    jPattern.consecutive()
+    this
+  }
+
 }
 
 object Pattern {

http://git-wip-us.apache.org/repos/asf/flink/blob/aa3c395b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
index c673576..2503ffd 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
@@ -41,7 +41,7 @@ public class State<T> implements Serializable {
        private static final long serialVersionUID = 6658700025989097781L;
 
        private final String name;
-       private final StateType stateType;
+       private StateType stateType;
        private final Collection<StateTransition<T>> stateTransitions;
 
        public State(final String name, final StateType stateType) {
@@ -65,6 +65,10 @@ public class State<T> implements Serializable {
                return stateTransitions;
        }
 
+       public void makeStart() {
+               this.stateType = StateType.Start;
+       }
+
        private void addStateTransition(
                        final StateTransitionAction action,
                        final State<T> targetState,

http://git-wip-us.apache.org/repos/asf/flink/blob/aa3c395b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index 4fb918f..e441c4b 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -152,6 +152,7 @@ public class NFACompiler {
 
                /**
                 * Creates all the states between Start and Final state.
+                *
                 * @param sinkState the state that last state should point to 
(always the Final state)
                 * @return the next state after Start in the resulting graph
                 */
@@ -160,27 +161,25 @@ public class NFACompiler {
                        State<T> lastSink = sinkState;
                        while (currentPattern.getPrevious() != null) {
                                checkPatternNameUniqueness();
-
-                               State<T> sourceState = new 
State<>(currentPattern.getName(), State.StateType.Normal);
-                               states.add(sourceState);
-                               usedNames.add(sourceState.getName());
+                               usedNames.add(currentPattern.getName());
 
                                if 
(currentPattern.getQuantifier().hasProperty(QuantifierProperty.LOOPING)) {
-                                       convertToLooping(sourceState, lastSink);
+                                       final State<T> looping = 
createLooping(lastSink);
 
                                        if 
(currentPattern.getQuantifier().hasProperty(QuantifierProperty.AT_LEAST_ONE)) {
-                                               sourceState = 
createFirstMandatoryStateOfLoop(sourceState, State.StateType.Normal);
-                                               states.add(sourceState);
-                                               
usedNames.add(sourceState.getName());
+                                               lastSink = 
createFirstMandatoryStateOfLoop(looping);
+                                       } else if (currentPattern instanceof 
FollowedByPattern &&
+                                                               
currentPattern.getQuantifier().hasProperty(QuantifierProperty.STRICT)) {
+                                               lastSink = 
createWaitingStateForZeroOrMore(looping, lastSink);
+                                       } else {
+                                               lastSink = looping;
                                        }
-                               } else if (currentPattern.getQuantifier() == 
Quantifier.TIMES) {
-                                       sourceState = 
convertToTimesState(sourceState, lastSink, currentPattern.getTimes());
+                               } else if 
(currentPattern.getQuantifier().hasProperty(QuantifierProperty.TIMES)) {
+                                       lastSink = createTimesState(lastSink, 
currentPattern.getTimes());
                                } else {
-                                       convertToSingletonState(sourceState, 
lastSink);
+                                       lastSink = 
createSingletonState(lastSink);
                                }
-
                                currentPattern = currentPattern.getPrevious();
-                               lastSink = sourceState;
 
                                final Time currentWindowTime = 
currentPattern.getWindowTime();
                                if (currentWindowTime != null && 
currentWindowTime.toMilliseconds() < windowTime) {
@@ -192,6 +191,30 @@ public class NFACompiler {
                        return lastSink;
                }
 
+               /**
+                * Creates a pair of states that enables relaxed strictness 
before a zeroOrMore looping state.
+                *
+                * @param loopingState the first state of zeroOrMore complex 
state
+                * @param lastSink     the state that the looping one points to
+                * @return the newly created state
+                */
+               private State<T> createWaitingStateForZeroOrMore(final State<T> 
loopingState, final State<T> lastSink) {
+                       final State<T> followByState = createNormalState();
+                       final State<T> followByStateWithoutProceed = 
createNormalState();
+
+                       final IterativeCondition<T> currentFunction = 
(IterativeCondition<T>)currentPattern.getCondition();
+                       final IterativeCondition<T> ignoreFunction = 
getIgnoreCondition(currentPattern);
+
+                       followByState.addProceed(lastSink, 
BooleanConditions.<T>trueFunction());
+                       followByState.addIgnore(followByStateWithoutProceed, 
ignoreFunction);
+                       followByState.addTake(loopingState, currentFunction);
+
+                       followByStateWithoutProceed.addIgnore(ignoreFunction);
+                       followByStateWithoutProceed.addTake(loopingState, 
currentFunction);
+
+                       return followByState;
+               }
+
                private void checkPatternNameUniqueness() {
                        if (usedNames.contains(currentPattern.getName())) {
                                throw new MalformedPatternException(
@@ -202,110 +225,112 @@ public class NFACompiler {
 
                /**
                 * Creates the Start {@link State} of the resulting NFA graph.
+                *
                 * @param sinkState the state that Start state should point to 
(alwyas first state of middle states)
                 * @return created state
                 */
                @SuppressWarnings("unchecked")
                private State<T> createStartState(State<T> sinkState) {
                        checkPatternNameUniqueness();
+                       usedNames.add(currentPattern.getName());
 
                        final State<T> beginningState;
                        if 
(currentPattern.getQuantifier().hasProperty(QuantifierProperty.LOOPING)) {
-                               final State<T> loopingState;
+                               final State<T> loopingState = 
createLooping(sinkState);
                                if 
(currentPattern.getQuantifier().hasProperty(QuantifierProperty.AT_LEAST_ONE)) {
-                                       loopingState = new 
State<>(currentPattern.getName(), State.StateType.Normal);
-                                       beginningState = 
createFirstMandatoryStateOfLoop(loopingState, State.StateType.Start);
-                                       states.add(loopingState);
+                                       beginningState = 
createFirstMandatoryStateOfLoop(loopingState);
                                } else {
-                                       loopingState = new 
State<>(currentPattern.getName(), State.StateType.Start);
                                        beginningState = loopingState;
                                }
-                               convertToLooping(loopingState, sinkState, true);
-                       } else  {
-                               if (currentPattern.getQuantifier() == 
Quantifier.TIMES && currentPattern.getTimes() > 1) {
-                                       final State<T> timesState = new 
State<>(currentPattern.getName(), State.StateType.Normal);
-                                       states.add(timesState);
-                                       sinkState = 
convertToTimesState(timesState, sinkState, currentPattern.getTimes() - 1);
-                               }
-
-                               beginningState = new 
State<>(currentPattern.getName(), State.StateType.Start);
-                               convertToSingletonState(beginningState, 
sinkState);
+                       } else if 
(currentPattern.getQuantifier().hasProperty(QuantifierProperty.TIMES)) {
+                               beginningState = createTimesState(sinkState, 
currentPattern.getTimes());
+                       } else {
+                               beginningState = 
createSingletonState(sinkState);
                        }
 
-                       states.add(beginningState);
-                       usedNames.add(beginningState.getName());
+                       beginningState.makeStart();
 
                        return beginningState;
                }
 
                /**
-                * Converts the given state into a "complex" state consisting 
of given number of states with
+                * Creates a "complex" state consisting of given number of 
states with
                 * same {@link IterativeCondition}
                 *
-                * @param sourceState the state to be converted
-                * @param sinkState the state that the converted state should 
point to
-                * @param times number of times the state should be copied
+                * @param sinkState the state that the created state should 
point to
+                * @param times     number of times the state should be copied
                 * @return the first state of the "complex" state, next state 
should point to it
                 */
-               private State<T> convertToTimesState(final State<T> 
sourceState, final State<T> sinkState, int times) {
-                       convertToSingletonState(sourceState, sinkState);
-                       State<T> lastSink;
-                       State<T> firstState = sourceState;
+               private State<T> createTimesState(final State<T> sinkState, int 
times) {
+                       State<T> lastSink = sinkState;
                        for (int i = 0; i < times - 1; i++) {
-                               lastSink = firstState;
-                               firstState = new 
State<>(currentPattern.getName(), State.StateType.Normal);
-                               states.add(firstState);
-                               convertToSingletonState(firstState, lastSink);
+                               lastSink = createSingletonState(
+                                       lastSink,
+                                       currentPattern instanceof 
FollowedByPattern &&
+                                       
!currentPattern.getQuantifier().hasProperty(QuantifierProperty.STRICT));
                        }
-                       return firstState;
+                       return createSingletonState(lastSink, currentPattern 
instanceof FollowedByPattern);
                }
 
                /**
-                * Converts the given state into a simple single state. For an 
OPTIONAL state it also consists
+                * Creates a simple single state. For an OPTIONAL state it also 
consists
                 * of a similar state without the PROCEED edge, so that for 
each PROCEED transition branches
                 * in computation state graph  can be created only once.
                 *
-                * @param sourceState the state to be converted
                 * @param sinkState state that the state being converted should 
point to
+                * @return the created state
                 */
                @SuppressWarnings("unchecked")
-               private void convertToSingletonState(final State<T> 
sourceState, final State<T> sinkState) {
+               private State<T> createSingletonState(final State<T> sinkState) 
{
+                       return createSingletonState(sinkState, currentPattern 
instanceof FollowedByPattern);
+               }
 
+               /**
+                * Creates a simple single state. For an OPTIONAL state it also 
consists
+                * of a similar state without the PROCEED edge, so that for 
each PROCEED transition branches
+                * in computation state graph  can be created only once.
+                *
+                * @param addIgnore if any IGNORE should be added
+                * @param sinkState state that the state being converted should 
point to
+                * @return the created state
+                */
+               @SuppressWarnings("unchecked")
+               private State<T> createSingletonState(final State<T> sinkState, 
boolean addIgnore) {
                        final IterativeCondition<T> currentFilterFunction = 
(IterativeCondition<T>) currentPattern.getCondition();
                        final IterativeCondition<T> trueFunction = 
BooleanConditions.trueFunction();
 
-                       sourceState.addTake(sinkState, currentFilterFunction);
+                       final State<T> singletonState = createNormalState();
+                       singletonState.addTake(sinkState, 
currentFilterFunction);
 
                        if (currentPattern.getQuantifier() == 
Quantifier.OPTIONAL) {
-                               sourceState.addProceed(sinkState, trueFunction);
+                               singletonState.addProceed(sinkState, 
trueFunction);
                        }
 
-                       if (currentPattern instanceof FollowedByPattern) {
+                       if (addIgnore) {
                                final State<T> ignoreState;
                                if (currentPattern.getQuantifier() == 
Quantifier.OPTIONAL) {
-                                       ignoreState = new 
State<>(currentPattern.getName(), State.StateType.Normal);
+                                       ignoreState = createNormalState();
                                        ignoreState.addTake(sinkState, 
currentFilterFunction);
-                                       states.add(ignoreState);
                                } else {
-                                       ignoreState = sourceState;
+                                       ignoreState = singletonState;
                                }
-                               sourceState.addIgnore(ignoreState, 
trueFunction);
+                               singletonState.addIgnore(ignoreState, 
trueFunction);
                        }
+                       return singletonState;
                }
 
                /**
-                * Patterns with quantifiers AT_LEAST_ONE_* are converted into 
pair of states: a singleton state and
+                * Patterns with quantifiers AT_LEAST_ONE_* are created as a 
pair of states: a singleton state and
                 * looping state. This method creates the first of the two.
                 *
                 * @param sinkState the state the newly created state should 
point to, it should be a looping state
-                * @param stateType the type of the created state, as the NFA 
graph can also start wit AT_LEAST_ONE_*
                 * @return the newly created state
                 */
                @SuppressWarnings("unchecked")
-               private State<T> createFirstMandatoryStateOfLoop(final State<T> 
sinkState, final State.StateType stateType) {
+               private State<T> createFirstMandatoryStateOfLoop(final State<T> 
sinkState) {
 
                        final IterativeCondition<T> currentFilterFunction = 
(IterativeCondition<T>) currentPattern.getCondition();
-                       final State<T> firstState = new 
State<>(currentPattern.getName(), stateType);
+                       final State<T> firstState = createNormalState();
 
                        firstState.addTake(sinkState, currentFilterFunction);
                        if (currentPattern instanceof FollowedByPattern) {
@@ -316,49 +341,45 @@ public class NFACompiler {
                }
 
                /**
-                * Converts the given state into looping one. Looping state is 
one with TAKE edge to itself and
+                * Creates the given state as a looping one. Looping state is 
one with TAKE edge to itself and
                 * PROCEED edge to the sinkState. It also consists of a similar 
state without the PROCEED edge, so that
                 * for each PROCEED transition branches in computation state 
graph  can be created only once.
                 *
-                * <p>If this looping state is first of a graph we should treat 
the {@link Pattern} as {@link FollowedByPattern}
-                * to enable combinations.
-                *
-                * @param sourceState  the state to converted
-                * @param sinkState    the state that the converted state 
should point to
-                * @param isFirstState if the looping state is first of a graph
+                * @param sinkState the state that the converted state should 
point to
+                * @return the first state of the created complex state
                 */
                @SuppressWarnings("unchecked")
-               private void convertToLooping(final State<T> sourceState, final 
State<T> sinkState, boolean isFirstState) {
+               private State<T> createLooping(final State<T> sinkState) {
 
+                       final State<T> loopingState = createNormalState();
                        final IterativeCondition<T> filterFunction = 
(IterativeCondition<T>) currentPattern.getCondition();
-                       final IterativeCondition<T> trueFunction = 
BooleanConditions.<T>trueFunction();
+                       final IterativeCondition<T> trueFunction = 
BooleanConditions.trueFunction();
 
-                       sourceState.addProceed(sinkState, trueFunction);
-                       sourceState.addTake(filterFunction);
-                       if (currentPattern instanceof FollowedByPattern || 
isFirstState) {
-                               final State<T> ignoreState = new State<>(
-                                       currentPattern.getName(),
-                                       State.StateType.Normal);
+                       loopingState.addProceed(sinkState, trueFunction);
+                       loopingState.addTake(filterFunction);
+                       if 
(!currentPattern.getQuantifier().hasProperty(QuantifierProperty.STRICT)) {
+                               final State<T> ignoreState = 
createNormalState();
 
                                final IterativeCondition<T> ignoreCondition = 
getIgnoreCondition(currentPattern);
 
-                               sourceState.addIgnore(ignoreState, 
ignoreCondition);
-                               ignoreState.addTake(sourceState, 
filterFunction);
-                               ignoreState.addIgnore(ignoreState, 
ignoreCondition);
-                               states.add(ignoreState);
+                               ignoreState.addTake(loopingState, 
filterFunction);
+                               ignoreState.addIgnore(ignoreCondition);
+                               loopingState.addIgnore(ignoreState, 
ignoreCondition);
                        }
+
+                       return loopingState;
                }
 
                /**
-                * Converts the given state into looping one. Looping state is 
one with TAKE edge to itself and
-                * PROCEED edge to the sinkState. It also consists of a similar 
state without the PROCEED edge, so that
-                * for each PROCEED transition branches in computation state 
graph  can be created only once.
+                * Creates a state with {@link State.StateType#Normal} and adds 
it to the collection of created states.
+                * Should be used instead of instantiating with new operator.
                 *
-                * @param sourceState the state to converted
-                * @param sinkState   the state that the converted state should 
point to
+                * @return the created state
                 */
-               private void convertToLooping(final State<T> sourceState, final 
State<T> sinkState) {
-                       convertToLooping(sourceState, sinkState, false);
+               private State<T> createNormalState() {
+                       final State<T> state = new 
State<>(currentPattern.getName(), State.StateType.Normal);
+                       states.add(state);
+                       return state;
                }
 
                /**
@@ -381,7 +402,7 @@ public class NFACompiler {
         * has at most one TAKE and one IGNORE and name of each state is 
unique. No PROCEED transition is allowed!
         *
         * @param oldStartState dummy start state of old graph
-        * @param <T> type of events
+        * @param <T>           type of events
         * @return map of new states, where key is the name of a state and 
value is the state itself
         */
        @Internal

http://git-wip-us.apache.org/repos/asf/flink/blob/aa3c395b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index cd51788..14c3e2d 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -267,6 +267,76 @@ public class Pattern<T, F extends T> {
        }
 
        /**
+        * Works in conjunction with {@link Pattern#zeroOrMore()}, {@link 
Pattern#oneOrMore()} or {@link Pattern#times(int)}.
+        * Specifies that any not matching element breaks the loop.
+        *
+        * <p>E.g. a pattern like:
+        * <pre>{@code
+        * Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+        *      @Override
+        *      public boolean filter(Event value) throws Exception {
+        *          return value.getName().equals("c");
+        *      }
+        * })
+        * .followedBy("middle").where(new FilterFunction<Event>() {
+        *      @Override
+        *      public boolean filter(Event value) throws Exception {
+        *          return value.getName().equals("a");
+        *      }
+        * })
+        * }<b>.oneOrMore(true).consecutive()</b>{@code
+        * .followedBy("end1").where(new FilterFunction<Event>() {
+        *      @Override
+        *      public boolean filter(Event value) throws Exception {
+        *          return value.getName().equals("b");
+        *      }
+        * });
+        * }</pre>
+        *
+        * <p>for a sequence: C D A1 A2 A3 D A4 B
+        *
+        * <p>will generate matches: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}
+        *
+        * <p><b>NOTICE:</b> This operator can be applied only when either 
zeroOrMore,
+        * oneOrMore or times was previously applied!
+        *
+        * <p>By default a relaxed continuity is applied.
+        *
+        * @return pattern with continuity changed to strict
+        */
+       public Pattern<T, F> consecutive() {
+               switch (this.quantifier) {
+
+                       case ZERO_OR_MORE_EAGER:
+                               this.quantifier = 
Quantifier.ZERO_OR_MORE_EAGER_STRICT;
+                               break;
+                       case ZERO_OR_MORE_COMBINATIONS:
+                               this.quantifier = 
Quantifier.ZERO_OR_MORE_COMBINATIONS_STRICT;
+                               break;
+                       case ONE_OR_MORE_EAGER:
+                               this.quantifier = 
Quantifier.ONE_OR_MORE_EAGER_STRICT;
+                               break;
+                       case ONE_OR_MORE_COMBINATIONS:
+                               this.quantifier = 
Quantifier.ONE_OR_MORE_COMBINATIONS_STRICT;
+                               break;
+                       case TIMES:
+                               this.quantifier = Quantifier.TIMES_STRICT;
+                               break;
+                       case ZERO_OR_MORE_COMBINATIONS_STRICT:
+                       case ONE_OR_MORE_EAGER_STRICT:
+                       case ONE_OR_MORE_COMBINATIONS_STRICT:
+                       case ZERO_OR_MORE_EAGER_STRICT:
+                       case TIMES_STRICT:
+                               throw new MalformedPatternException("Strict 
continuity already applied! consecutive() called twice.");
+                       case ONE:
+                       case OPTIONAL:
+                               throw new MalformedPatternException("Strict 
continuity cannot be applied to " + this.quantifier);
+               }
+
+               return this;
+       }
+
+       /**
         * Specifies that this pattern can occur zero or once.
         *
         * @return The same pattern with applied Kleene ? operator
@@ -300,4 +370,5 @@ public class Pattern<T, F extends T> {
                        throw new MalformedPatternException("Already applied 
quantifier to this Pattern. Current quantifier is: " + this.quantifier);
                }
        }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/aa3c395b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
index 7abe9bd..9789072 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
@@ -23,12 +23,24 @@ public enum Quantifier {
        ONE,
        ZERO_OR_MORE_EAGER(QuantifierProperty.LOOPING, 
QuantifierProperty.EAGER),
        ZERO_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING),
+       ZERO_OR_MORE_EAGER_STRICT(QuantifierProperty.EAGER, 
QuantifierProperty.STRICT, QuantifierProperty.LOOPING),
+       ZERO_OR_MORE_COMBINATIONS_STRICT(QuantifierProperty.STRICT, 
QuantifierProperty.LOOPING),
        ONE_OR_MORE_EAGER(
                QuantifierProperty.LOOPING,
                QuantifierProperty.EAGER,
                QuantifierProperty.AT_LEAST_ONE),
+       ONE_OR_MORE_EAGER_STRICT(
+               QuantifierProperty.STRICT,
+               QuantifierProperty.LOOPING,
+               QuantifierProperty.EAGER,
+               QuantifierProperty.AT_LEAST_ONE),
        ONE_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING, 
QuantifierProperty.AT_LEAST_ONE),
-       TIMES,
+       ONE_OR_MORE_COMBINATIONS_STRICT(
+               QuantifierProperty.STRICT,
+               QuantifierProperty.LOOPING,
+               QuantifierProperty.AT_LEAST_ONE),
+       TIMES(QuantifierProperty.TIMES),
+       TIMES_STRICT(QuantifierProperty.TIMES, QuantifierProperty.STRICT),
        OPTIONAL;
 
        private final EnumSet<QuantifierProperty> properties;
@@ -48,7 +60,9 @@ public enum Quantifier {
        public enum QuantifierProperty {
                LOOPING,
                EAGER,
-               AT_LEAST_ONE
+               AT_LEAST_ONE,
+               STRICT,
+               TIMES
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/aa3c395b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 197767e..da5f413 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -591,8 +591,9 @@ public class NFAITCase extends TestLogger {
                inputEvents.add(new StreamRecord<>(startEvent, 1));
                inputEvents.add(new StreamRecord<>(middleEvent1, 3));
                inputEvents.add(new StreamRecord<>(middleEvent2, 4));
-               inputEvents.add(new StreamRecord<>(middleEvent3, 5));
-               inputEvents.add(new StreamRecord<>(end1, 6));
+               inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5));
+               inputEvents.add(new StreamRecord<>(middleEvent3, 6));
+               inputEvents.add(new StreamRecord<>(end1, 7));
 
                Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
@@ -642,7 +643,6 @@ public class NFAITCase extends TestLogger {
                ), resultingPatterns);
        }
 
-
        @Test
        public void testBeginWithZeroOrMore() {
                List<StreamRecord<Event>> inputEvents = new ArrayList<>();
@@ -1129,7 +1129,7 @@ public class NFAITCase extends TestLogger {
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("middle");
                        }
-               }).zeroOrMore(false).followedBy("end").where(new 
SimpleCondition<Event>() {
+               }).zeroOrMore().consecutive().followedBy("end").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
7056763917392056548L;
 
                        @Override
@@ -1281,7 +1281,6 @@ public class NFAITCase extends TestLogger {
                ), resultingPatterns);
        }
 
-
        @Test
        public void testTimes() {
                List<StreamRecord<Event>> inputEvents = new ArrayList<>();
@@ -1659,9 +1658,293 @@ public class NFAITCase extends TestLogger {
                ), resultingPatterns);
        }
 
-       /**
-        * Clearing SharedBuffer
-        */
+
+       ///////////////////////////////         Consecutive           
////////////////////////////////////////
+
+       private static class ConsecutiveData {
+               static final Event startEvent = new Event(40, "c", 1.0);
+               static final Event middleEvent1 = new Event(41, "a", 2.0);
+               static final Event middleEvent2 = new Event(42, "a", 3.0);
+               static final Event middleEvent3 = new Event(43, "a", 4.0);
+               static final Event middleEvent4 = new Event(43, "a", 5.0);
+               static final Event end = new Event(44, "b", 5.0);
+
+               private ConsecutiveData() {
+               }
+       }
+
+       @Test
+       public void testStrictCombinationsOneOrMore() {
+               List<List<Event>> resultingPatterns = 
testStrictOneOrMore(false);
+
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.middleEvent3, ConsecutiveData.end),
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.end),
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.end),
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, 
ConsecutiveData.end),
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent2, ConsecutiveData.end),
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent3, ConsecutiveData.end),
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent4, ConsecutiveData.end)
+               ));
+       }
+
+       @Test
+       public void testStrictEagerOneOrMore() {
+               List<List<Event>> resultingPatterns = testStrictOneOrMore(true);
+
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.middleEvent3, ConsecutiveData.end),
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.end),
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.end)
+               ));
+       }
+
+       private List<List<Event>> testStrictOneOrMore(boolean eager) {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 
1));
+               inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 2));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent2, 4));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent3, 5));
+               inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 6));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent4, 7));
+               inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 8));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).oneOrMore(eager).consecutive()
+                       .followedBy("end1").where(new SimpleCondition<Event>() {
+                               private static final long serialVersionUID = 
5726188262756267490L;
+
+                               @Override
+                               public boolean filter(Event value) throws 
Exception {
+                                       return value.getName().equals("b");
+                               }
+                       });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               return feedNFA(inputEvents, nfa);
+       }
+
+       @Test
+       public void testStrictEagerZeroOrMore() {
+               List<List<Event>> resultingPatterns = 
testStrictZeroOrMore(true);
+
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.end),
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.end),
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.end)
+               ));
+       }
+
+       @Test
+       public void testStrictCombinationsZeroOrMore() {
+               List<List<Event>> resultingPatterns = 
testStrictZeroOrMore(false);
+
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.end),
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.end),
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent2, ConsecutiveData.end),
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent3, ConsecutiveData.end),
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.end)
+               ));
+       }
+
+       private List<List<Event>> testStrictZeroOrMore(boolean eager) {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 
1));
+               inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 2));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent2, 4));
+               inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+               inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).zeroOrMore(eager).consecutive().followedBy("end1").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               return feedNFA(inputEvents, nfa);
+       }
+
+
+       @Test
+       public void testTimesStrict() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 
1));
+               inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+               inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+               inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).times(2).consecutive().followedBy("end1").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end)
+               ));
+       }
+
+       @Test
+       public void testTimesNonStrict() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 
1));
+               inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+               inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+               inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).times(2).followedBy("end1").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, 
ConsecutiveData.end),
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, 
ConsecutiveData.end),
+                       Lists.newArrayList(ConsecutiveData.startEvent, 
ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end)
+               ));
+       }
+
+       @Test
+       public void testStartWithZeroOrMoreStrict() {
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).zeroOrMore().consecutive();
+
+               testStartWithOneOrZeroOrMoreStrict(pattern);
+       }
+
+       @Test
+       public void testStartWithOneOrMoreStrict() {
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).oneOrMore().consecutive();
+
+               testStartWithOneOrZeroOrMoreStrict(pattern);
+       }
+
+       private void testStartWithOneOrZeroOrMoreStrict(Pattern<Event, ?> 
pattern) {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 
1));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+               inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 
4));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+               inputEvents.add(new 
StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(ConsecutiveData.middleEvent1),
+                       Lists.newArrayList(ConsecutiveData.middleEvent2, 
ConsecutiveData.middleEvent3),
+                       Lists.newArrayList(ConsecutiveData.middleEvent2),
+                       Lists.newArrayList(ConsecutiveData.middleEvent3)
+               ));
+       }
+
+       ///////////////////////////////     Clearing SharedBuffer     
////////////////////////////////////////
 
        @Test
        public void testTimesClearingBuffer() {
@@ -1934,17 +2217,7 @@ public class NFAITCase extends TestLogger {
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-               List<List<Event>> resultingPatterns = new ArrayList<>();
-
-               for (StreamRecord<Event> inputEvent : inputEvents) {
-                       Collection<Map<String, Event>> patterns = nfa.process(
-                                       inputEvent.getValue(),
-                                       inputEvent.getTimestamp()).f0;
-
-                       for (Map<String, Event> p: patterns) {
-                               resultingPatterns.add(new 
ArrayList<>(p.values()));
-                       }
-               }
+               List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
                return resultingPatterns;
        }
@@ -2019,17 +2292,7 @@ public class NFAITCase extends TestLogger {
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-               List<List<Event>> resultingPatterns = new ArrayList<>();
-
-               for (StreamRecord<Event> inputEvent : inputEvents) {
-                       Collection<Map<String, Event>> patterns = nfa.process(
-                                       inputEvent.getValue(),
-                                       inputEvent.getTimestamp()).f0;
-
-                       for (Map<String, Event> p: patterns) {
-                               resultingPatterns.add(new 
ArrayList<>(p.values()));
-                       }
-               }
+               List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
                return resultingPatterns;
        }
@@ -2068,17 +2331,7 @@ public class NFAITCase extends TestLogger {
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-               List<List<Event>> resultingPatterns = new ArrayList<>();
-
-               for (StreamRecord<Event> inputEvent : inputEvents) {
-                       Collection<Map<String, Event>> patterns = nfa.process(
-                                       inputEvent.getValue(),
-                                       inputEvent.getTimestamp()).f0;
-
-                       for (Map<String, Event> p: patterns) {
-                               resultingPatterns.add(new 
ArrayList<>(p.values()));
-                       }
-               }
+               List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
                compareMaps(resultingPatterns,
                                Lists.<List<Event>>newArrayList(
@@ -2145,17 +2398,7 @@ public class NFAITCase extends TestLogger {
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-               List<List<Event>> resultingPatterns = new ArrayList<>();
-
-               for (StreamRecord<Event> inputEvent : inputEvents) {
-                       Collection<Map<String, Event>> patterns = nfa.process(
-                                       inputEvent.getValue(),
-                                       inputEvent.getTimestamp()).f0;
-
-                       for (Map<String, Event> p: patterns) {
-                               resultingPatterns.add(new 
ArrayList<>(p.values()));
-                       }
-               }
+               List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
                compareMaps(resultingPatterns,
                                Lists.<List<Event>>newArrayList(
@@ -2212,17 +2455,7 @@ public class NFAITCase extends TestLogger {
 
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-               List<List<Event>> resultingPatterns = new ArrayList<>();
-
-               for (StreamRecord<Event> inputEvent : inputEvents) {
-                       Collection<Map<String, Event>> patterns = nfa.process(
-                                       inputEvent.getValue(),
-                                       inputEvent.getTimestamp()).f0;
-
-                       for (Map<String, Event> p: patterns) {
-                               resultingPatterns.add(new 
ArrayList<>(p.values()));
-                       }
-               }
+               List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
                compareMaps(resultingPatterns,
                                Lists.<List<Event>>newArrayList(
@@ -2237,6 +2470,21 @@ public class NFAITCase extends TestLogger {
                );
        }
 
+       private List<List<Event>> feedNFA(List<StreamRecord<Event>> 
inputEvents, NFA<Event> nfa) {
+               List<List<Event>> resultingPatterns = new ArrayList<>();
+
+               for (StreamRecord<Event> inputEvent : inputEvents) {
+                       Collection<Map<String, Event>> patterns = nfa.process(
+                               inputEvent.getValue(),
+                               inputEvent.getTimestamp()).f0;
+
+                       for (Map<String, Event> p: patterns) {
+                               resultingPatterns.add(new 
ArrayList<>(p.values()));
+                       }
+               }
+               return resultingPatterns;
+       }
+
        private void compareMaps(List<List<Event>> actual, List<List<Event>> 
expected) {
                Assert.assertEquals(expected.size(), actual.size());
 

Reply via email to