[FLINK-6165] [cep] Implement internal continuity for looping states. Allows looping states (oneOrMore, zeroOrMore, times) to specify if they want their elements to be consecutive or allow non-matching elements in-between.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aa3c395b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aa3c395b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aa3c395b Branch: refs/heads/table-retraction Commit: aa3c395b97943e312dd16964b363f0e4f86c6739 Parents: d4665a0 Author: Dawid Wysakowicz <[email protected]> Authored: Mon Mar 27 15:05:11 2017 +0200 Committer: kl0u <[email protected]> Committed: Thu Mar 30 10:24:19 2017 +0200 ---------------------------------------------------------------------- docs/dev/libs/cep.md | 66 ++++ .../flink/cep/scala/pattern/Pattern.scala | 29 ++ .../java/org/apache/flink/cep/nfa/State.java | 6 +- .../flink/cep/nfa/compiler/NFACompiler.java | 189 +++++----- .../org/apache/flink/cep/pattern/Pattern.java | 71 ++++ .../apache/flink/cep/pattern/Quantifier.java | 18 +- .../org/apache/flink/cep/nfa/NFAITCase.java | 374 +++++++++++++++---- 7 files changed, 603 insertions(+), 150 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/aa3c395b/docs/dev/libs/cep.md ---------------------------------------------------------------------- diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md index 932ba30..bb704c7 100644 --- a/docs/dev/libs/cep.md +++ b/docs/dev/libs/cep.md @@ -396,6 +396,7 @@ patternState.within(Time.seconds(10)); <td> <p>Specifies that this pattern can occur zero or more times(kleene star). This means any number of events can be matched in this state.</p> <p>If eagerness is enabled(by default) for a pattern A*B and sequence A1 A2 B will generate patterns: B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B.</p> + <p>By default a relaxed internal continuity (between subsequent events of a loop) is used. For more info on the internal continuity see <a href="#consecutive_java">consecutive</a></p> {% highlight java %} patternState.zeroOrMore(); {% endhighlight %} @@ -406,6 +407,7 @@ patternState.within(Time.seconds(10)); <td> <p>Specifies that this pattern can occur one or more times(kleene star). This means at least one and at most infinite number of events can be matched in this state.</p> <p>If eagerness is enabled (by default) for a pattern A*B and sequence A1 A2 B will generate patterns: A1 B and A1 A2 B. If disabled A1 B, A2 B and A1 A2 B.</p> + <p>By default a relaxed internal continuity (between subsequent events of a loop) is used. For more info on the internal continuity see <a href="#consecutive_java">consecutive</a></p> {% highlight java %} patternState.oneOrMore(); {% endhighlight %} @@ -424,11 +426,50 @@ patternState.within(Time.seconds(10)); <td><strong>Times</strong></td> <td> <p>Specifies exact number of times that this pattern should be matched.</p> + <p>By default a relaxed internal continuity (between subsequent events of a loop) is used. For more info on the internal continuity see <a href="#consecutive_java">consecutive</a></p> {% highlight java %} patternState.times(2); {% endhighlight %} </td> </tr> + <tr> + <td><strong>Consecutive</strong><a name="consecutive_java"></a></td> + <td> + <p>Works in conjunction with zeroOrMore, oneOrMore or times. Specifies that any not matching element breaks the loop.</p> + + <p>If not applied a relaxed continuity (as in followedBy) is used.</p> + + <p>E.g. a pattern like:</p> + {% highlight java %} + Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }) + .followedBy("middle").where(new SimpleCondition<Event>() { + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }) + .oneOrMore(true).consecutive() + .followedBy("end1").where(new SimpleCondition<Event>() { + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + {% endhighlight %} + + <p>Will generate the following matches for a sequence: C D A1 A2 A3 D A4 B</p> + + <p>with consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}</p> + <p>without consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}</p> + + <p><b>NOTICE:</b> This option can be applied only to zeroOrMore(), oneOrMore() and times()!</p> + </td> + </tr> </tbody> </table> </div> @@ -511,6 +552,7 @@ patternState.within(Time.seconds(10)) <td> <p>Specifies that this pattern can occur zero or more times(kleene star). This means any number of events can be matched in this state.</p> <p>If eagerness is enabled(by default) for a pattern A*B and sequence A1 A2 B will generate patterns: B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B.</p> + <p>By default a relaxed internal continuity (between subsequent events of a loop) is used. For more info on the internal continuity see <a href="#consecutive_scala">consecutive</a></p> {% highlight scala %} patternState.zeroOrMore() {% endhighlight %} @@ -521,6 +563,7 @@ patternState.within(Time.seconds(10)) <td> <p>Specifies that this pattern can occur one or more times(kleene star). This means at least one and at most infinite number of events can be matched in this state.</p> <p>If eagerness is enabled (by default) for a pattern A*B and sequence A1 A2 B will generate patterns: A1 B and A1 A2 B. If disabled A1 B, A2 B and A1 A2 B.</p> + <p>By default a relaxed internal continuity (between subsequent events of a loop) is used. For more info on the internal continuity see <a href="#consecutive_scala">consecutive</a></p> {% highlight scala %} patternState.oneOrMore() {% endhighlight %} @@ -539,11 +582,34 @@ patternState.within(Time.seconds(10)) <td><strong>Times</strong></td> <td> <p>Specifies exact number of times that this pattern should be matched.</p> + <p>By default a relaxed internal continuity (between subsequent events of a loop) is used. For more info on the internal continuity see <a href="#consecutive_scala">consecutive</a></p> {% highlight scala %} patternState.times(2) {% endhighlight %} </td> </tr> + <tr> + <td><strong>Consecutive</strong><a name="consecutive_scala"></a></td> + <td> + <p>Works in conjunction with zeroOrMore, oneOrMore or times. Specifies that any not matching element breaks the loop.</p> + + <p>If not applied a relaxed continuity (as in followedBy) is used.</p> + + {% highlight scala %} + Pattern.begin("start").where(_.getName().equals("c")) + .followedBy("middle").where(_.getName().equals("a")) + .oneOrMore(true).consecutive() + .followedBy("end1").where(_.getName().equals("b")); + {% endhighlight %} + + <p>Will generate the following matches for a sequence: C D A1 A2 A3 D A4 B</p> + + <p>with consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}</p> + <p>without consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}</p> + + <p><b>NOTICE:</b> This option can be applied only to zeroOrMore(), oneOrMore() and times()!</p> + </td> + </tr> </tbody> </table> </div> http://git-wip-us.apache.org/repos/asf/flink/blob/aa3c395b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala index 07dfc5a..c636029 100644 --- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala +++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala @@ -270,6 +270,35 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) { this } + + /** + * Works in conjunction with [[org.apache.flink.cep.scala.pattern.Pattern#zeroOrMore()]], + * [[org.apache.flink.cep.scala.pattern.Pattern#oneOrMore()]] or + * [[org.apache.flink.cep.scala.pattern.Pattern#times(int)]]. + * Specifies that any not matching element breaks the loop. + * + * <p>E.g. a pattern like: + * {{{ + * Pattern.begin("start").where(_.getName().equals("c")) + * .followedBy("middle").where(_.getName().equals("a")).oneOrMore(true).consecutive() + * .followedBy("end1").where(_.getName().equals("b")); + * }}} + * + * <p>for a sequence: C D A1 A2 A3 D A4 B + * + * <p>will generate matches: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B} + * + * <p><b>NOTICE:</b> This operator can be applied only when either zeroOrMore, + * oneOrMore or times was previously applied! + * + * <p>By default a relaxed continuity is applied. + * @return pattern with continuity changed to strict + */ + def consecutive(): Pattern[T, F] = { + jPattern.consecutive() + this + } + } object Pattern { http://git-wip-us.apache.org/repos/asf/flink/blob/aa3c395b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java index c673576..2503ffd 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java @@ -41,7 +41,7 @@ public class State<T> implements Serializable { private static final long serialVersionUID = 6658700025989097781L; private final String name; - private final StateType stateType; + private StateType stateType; private final Collection<StateTransition<T>> stateTransitions; public State(final String name, final StateType stateType) { @@ -65,6 +65,10 @@ public class State<T> implements Serializable { return stateTransitions; } + public void makeStart() { + this.stateType = StateType.Start; + } + private void addStateTransition( final StateTransitionAction action, final State<T> targetState, http://git-wip-us.apache.org/repos/asf/flink/blob/aa3c395b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java index 4fb918f..e441c4b 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java @@ -152,6 +152,7 @@ public class NFACompiler { /** * Creates all the states between Start and Final state. + * * @param sinkState the state that last state should point to (always the Final state) * @return the next state after Start in the resulting graph */ @@ -160,27 +161,25 @@ public class NFACompiler { State<T> lastSink = sinkState; while (currentPattern.getPrevious() != null) { checkPatternNameUniqueness(); - - State<T> sourceState = new State<>(currentPattern.getName(), State.StateType.Normal); - states.add(sourceState); - usedNames.add(sourceState.getName()); + usedNames.add(currentPattern.getName()); if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.LOOPING)) { - convertToLooping(sourceState, lastSink); + final State<T> looping = createLooping(lastSink); if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.AT_LEAST_ONE)) { - sourceState = createFirstMandatoryStateOfLoop(sourceState, State.StateType.Normal); - states.add(sourceState); - usedNames.add(sourceState.getName()); + lastSink = createFirstMandatoryStateOfLoop(looping); + } else if (currentPattern instanceof FollowedByPattern && + currentPattern.getQuantifier().hasProperty(QuantifierProperty.STRICT)) { + lastSink = createWaitingStateForZeroOrMore(looping, lastSink); + } else { + lastSink = looping; } - } else if (currentPattern.getQuantifier() == Quantifier.TIMES) { - sourceState = convertToTimesState(sourceState, lastSink, currentPattern.getTimes()); + } else if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.TIMES)) { + lastSink = createTimesState(lastSink, currentPattern.getTimes()); } else { - convertToSingletonState(sourceState, lastSink); + lastSink = createSingletonState(lastSink); } - currentPattern = currentPattern.getPrevious(); - lastSink = sourceState; final Time currentWindowTime = currentPattern.getWindowTime(); if (currentWindowTime != null && currentWindowTime.toMilliseconds() < windowTime) { @@ -192,6 +191,30 @@ public class NFACompiler { return lastSink; } + /** + * Creates a pair of states that enables relaxed strictness before a zeroOrMore looping state. + * + * @param loopingState the first state of zeroOrMore complex state + * @param lastSink the state that the looping one points to + * @return the newly created state + */ + private State<T> createWaitingStateForZeroOrMore(final State<T> loopingState, final State<T> lastSink) { + final State<T> followByState = createNormalState(); + final State<T> followByStateWithoutProceed = createNormalState(); + + final IterativeCondition<T> currentFunction = (IterativeCondition<T>)currentPattern.getCondition(); + final IterativeCondition<T> ignoreFunction = getIgnoreCondition(currentPattern); + + followByState.addProceed(lastSink, BooleanConditions.<T>trueFunction()); + followByState.addIgnore(followByStateWithoutProceed, ignoreFunction); + followByState.addTake(loopingState, currentFunction); + + followByStateWithoutProceed.addIgnore(ignoreFunction); + followByStateWithoutProceed.addTake(loopingState, currentFunction); + + return followByState; + } + private void checkPatternNameUniqueness() { if (usedNames.contains(currentPattern.getName())) { throw new MalformedPatternException( @@ -202,110 +225,112 @@ public class NFACompiler { /** * Creates the Start {@link State} of the resulting NFA graph. + * * @param sinkState the state that Start state should point to (alwyas first state of middle states) * @return created state */ @SuppressWarnings("unchecked") private State<T> createStartState(State<T> sinkState) { checkPatternNameUniqueness(); + usedNames.add(currentPattern.getName()); final State<T> beginningState; if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.LOOPING)) { - final State<T> loopingState; + final State<T> loopingState = createLooping(sinkState); if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.AT_LEAST_ONE)) { - loopingState = new State<>(currentPattern.getName(), State.StateType.Normal); - beginningState = createFirstMandatoryStateOfLoop(loopingState, State.StateType.Start); - states.add(loopingState); + beginningState = createFirstMandatoryStateOfLoop(loopingState); } else { - loopingState = new State<>(currentPattern.getName(), State.StateType.Start); beginningState = loopingState; } - convertToLooping(loopingState, sinkState, true); - } else { - if (currentPattern.getQuantifier() == Quantifier.TIMES && currentPattern.getTimes() > 1) { - final State<T> timesState = new State<>(currentPattern.getName(), State.StateType.Normal); - states.add(timesState); - sinkState = convertToTimesState(timesState, sinkState, currentPattern.getTimes() - 1); - } - - beginningState = new State<>(currentPattern.getName(), State.StateType.Start); - convertToSingletonState(beginningState, sinkState); + } else if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.TIMES)) { + beginningState = createTimesState(sinkState, currentPattern.getTimes()); + } else { + beginningState = createSingletonState(sinkState); } - states.add(beginningState); - usedNames.add(beginningState.getName()); + beginningState.makeStart(); return beginningState; } /** - * Converts the given state into a "complex" state consisting of given number of states with + * Creates a "complex" state consisting of given number of states with * same {@link IterativeCondition} * - * @param sourceState the state to be converted - * @param sinkState the state that the converted state should point to - * @param times number of times the state should be copied + * @param sinkState the state that the created state should point to + * @param times number of times the state should be copied * @return the first state of the "complex" state, next state should point to it */ - private State<T> convertToTimesState(final State<T> sourceState, final State<T> sinkState, int times) { - convertToSingletonState(sourceState, sinkState); - State<T> lastSink; - State<T> firstState = sourceState; + private State<T> createTimesState(final State<T> sinkState, int times) { + State<T> lastSink = sinkState; for (int i = 0; i < times - 1; i++) { - lastSink = firstState; - firstState = new State<>(currentPattern.getName(), State.StateType.Normal); - states.add(firstState); - convertToSingletonState(firstState, lastSink); + lastSink = createSingletonState( + lastSink, + currentPattern instanceof FollowedByPattern && + !currentPattern.getQuantifier().hasProperty(QuantifierProperty.STRICT)); } - return firstState; + return createSingletonState(lastSink, currentPattern instanceof FollowedByPattern); } /** - * Converts the given state into a simple single state. For an OPTIONAL state it also consists + * Creates a simple single state. For an OPTIONAL state it also consists * of a similar state without the PROCEED edge, so that for each PROCEED transition branches * in computation state graph can be created only once. * - * @param sourceState the state to be converted * @param sinkState state that the state being converted should point to + * @return the created state */ @SuppressWarnings("unchecked") - private void convertToSingletonState(final State<T> sourceState, final State<T> sinkState) { + private State<T> createSingletonState(final State<T> sinkState) { + return createSingletonState(sinkState, currentPattern instanceof FollowedByPattern); + } + /** + * Creates a simple single state. For an OPTIONAL state it also consists + * of a similar state without the PROCEED edge, so that for each PROCEED transition branches + * in computation state graph can be created only once. + * + * @param addIgnore if any IGNORE should be added + * @param sinkState state that the state being converted should point to + * @return the created state + */ + @SuppressWarnings("unchecked") + private State<T> createSingletonState(final State<T> sinkState, boolean addIgnore) { final IterativeCondition<T> currentFilterFunction = (IterativeCondition<T>) currentPattern.getCondition(); final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction(); - sourceState.addTake(sinkState, currentFilterFunction); + final State<T> singletonState = createNormalState(); + singletonState.addTake(sinkState, currentFilterFunction); if (currentPattern.getQuantifier() == Quantifier.OPTIONAL) { - sourceState.addProceed(sinkState, trueFunction); + singletonState.addProceed(sinkState, trueFunction); } - if (currentPattern instanceof FollowedByPattern) { + if (addIgnore) { final State<T> ignoreState; if (currentPattern.getQuantifier() == Quantifier.OPTIONAL) { - ignoreState = new State<>(currentPattern.getName(), State.StateType.Normal); + ignoreState = createNormalState(); ignoreState.addTake(sinkState, currentFilterFunction); - states.add(ignoreState); } else { - ignoreState = sourceState; + ignoreState = singletonState; } - sourceState.addIgnore(ignoreState, trueFunction); + singletonState.addIgnore(ignoreState, trueFunction); } + return singletonState; } /** - * Patterns with quantifiers AT_LEAST_ONE_* are converted into pair of states: a singleton state and + * Patterns with quantifiers AT_LEAST_ONE_* are created as a pair of states: a singleton state and * looping state. This method creates the first of the two. * * @param sinkState the state the newly created state should point to, it should be a looping state - * @param stateType the type of the created state, as the NFA graph can also start wit AT_LEAST_ONE_* * @return the newly created state */ @SuppressWarnings("unchecked") - private State<T> createFirstMandatoryStateOfLoop(final State<T> sinkState, final State.StateType stateType) { + private State<T> createFirstMandatoryStateOfLoop(final State<T> sinkState) { final IterativeCondition<T> currentFilterFunction = (IterativeCondition<T>) currentPattern.getCondition(); - final State<T> firstState = new State<>(currentPattern.getName(), stateType); + final State<T> firstState = createNormalState(); firstState.addTake(sinkState, currentFilterFunction); if (currentPattern instanceof FollowedByPattern) { @@ -316,49 +341,45 @@ public class NFACompiler { } /** - * Converts the given state into looping one. Looping state is one with TAKE edge to itself and + * Creates the given state as a looping one. Looping state is one with TAKE edge to itself and * PROCEED edge to the sinkState. It also consists of a similar state without the PROCEED edge, so that * for each PROCEED transition branches in computation state graph can be created only once. * - * <p>If this looping state is first of a graph we should treat the {@link Pattern} as {@link FollowedByPattern} - * to enable combinations. - * - * @param sourceState the state to converted - * @param sinkState the state that the converted state should point to - * @param isFirstState if the looping state is first of a graph + * @param sinkState the state that the converted state should point to + * @return the first state of the created complex state */ @SuppressWarnings("unchecked") - private void convertToLooping(final State<T> sourceState, final State<T> sinkState, boolean isFirstState) { + private State<T> createLooping(final State<T> sinkState) { + final State<T> loopingState = createNormalState(); final IterativeCondition<T> filterFunction = (IterativeCondition<T>) currentPattern.getCondition(); - final IterativeCondition<T> trueFunction = BooleanConditions.<T>trueFunction(); + final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction(); - sourceState.addProceed(sinkState, trueFunction); - sourceState.addTake(filterFunction); - if (currentPattern instanceof FollowedByPattern || isFirstState) { - final State<T> ignoreState = new State<>( - currentPattern.getName(), - State.StateType.Normal); + loopingState.addProceed(sinkState, trueFunction); + loopingState.addTake(filterFunction); + if (!currentPattern.getQuantifier().hasProperty(QuantifierProperty.STRICT)) { + final State<T> ignoreState = createNormalState(); final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern); - sourceState.addIgnore(ignoreState, ignoreCondition); - ignoreState.addTake(sourceState, filterFunction); - ignoreState.addIgnore(ignoreState, ignoreCondition); - states.add(ignoreState); + ignoreState.addTake(loopingState, filterFunction); + ignoreState.addIgnore(ignoreCondition); + loopingState.addIgnore(ignoreState, ignoreCondition); } + + return loopingState; } /** - * Converts the given state into looping one. Looping state is one with TAKE edge to itself and - * PROCEED edge to the sinkState. It also consists of a similar state without the PROCEED edge, so that - * for each PROCEED transition branches in computation state graph can be created only once. + * Creates a state with {@link State.StateType#Normal} and adds it to the collection of created states. + * Should be used instead of instantiating with new operator. * - * @param sourceState the state to converted - * @param sinkState the state that the converted state should point to + * @return the created state */ - private void convertToLooping(final State<T> sourceState, final State<T> sinkState) { - convertToLooping(sourceState, sinkState, false); + private State<T> createNormalState() { + final State<T> state = new State<>(currentPattern.getName(), State.StateType.Normal); + states.add(state); + return state; } /** @@ -381,7 +402,7 @@ public class NFACompiler { * has at most one TAKE and one IGNORE and name of each state is unique. No PROCEED transition is allowed! * * @param oldStartState dummy start state of old graph - * @param <T> type of events + * @param <T> type of events * @return map of new states, where key is the name of a state and value is the state itself */ @Internal http://git-wip-us.apache.org/repos/asf/flink/blob/aa3c395b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java index cd51788..14c3e2d 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java @@ -267,6 +267,76 @@ public class Pattern<T, F extends T> { } /** + * Works in conjunction with {@link Pattern#zeroOrMore()}, {@link Pattern#oneOrMore()} or {@link Pattern#times(int)}. + * Specifies that any not matching element breaks the loop. + * + * <p>E.g. a pattern like: + * <pre>{@code + * Pattern.<Event>begin("start").where(new FilterFunction<Event>() { + * @Override + * public boolean filter(Event value) throws Exception { + * return value.getName().equals("c"); + * } + * }) + * .followedBy("middle").where(new FilterFunction<Event>() { + * @Override + * public boolean filter(Event value) throws Exception { + * return value.getName().equals("a"); + * } + * }) + * }<b>.oneOrMore(true).consecutive()</b>{@code + * .followedBy("end1").where(new FilterFunction<Event>() { + * @Override + * public boolean filter(Event value) throws Exception { + * return value.getName().equals("b"); + * } + * }); + * }</pre> + * + * <p>for a sequence: C D A1 A2 A3 D A4 B + * + * <p>will generate matches: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B} + * + * <p><b>NOTICE:</b> This operator can be applied only when either zeroOrMore, + * oneOrMore or times was previously applied! + * + * <p>By default a relaxed continuity is applied. + * + * @return pattern with continuity changed to strict + */ + public Pattern<T, F> consecutive() { + switch (this.quantifier) { + + case ZERO_OR_MORE_EAGER: + this.quantifier = Quantifier.ZERO_OR_MORE_EAGER_STRICT; + break; + case ZERO_OR_MORE_COMBINATIONS: + this.quantifier = Quantifier.ZERO_OR_MORE_COMBINATIONS_STRICT; + break; + case ONE_OR_MORE_EAGER: + this.quantifier = Quantifier.ONE_OR_MORE_EAGER_STRICT; + break; + case ONE_OR_MORE_COMBINATIONS: + this.quantifier = Quantifier.ONE_OR_MORE_COMBINATIONS_STRICT; + break; + case TIMES: + this.quantifier = Quantifier.TIMES_STRICT; + break; + case ZERO_OR_MORE_COMBINATIONS_STRICT: + case ONE_OR_MORE_EAGER_STRICT: + case ONE_OR_MORE_COMBINATIONS_STRICT: + case ZERO_OR_MORE_EAGER_STRICT: + case TIMES_STRICT: + throw new MalformedPatternException("Strict continuity already applied! consecutive() called twice."); + case ONE: + case OPTIONAL: + throw new MalformedPatternException("Strict continuity cannot be applied to " + this.quantifier); + } + + return this; + } + + /** * Specifies that this pattern can occur zero or once. * * @return The same pattern with applied Kleene ? operator @@ -300,4 +370,5 @@ public class Pattern<T, F extends T> { throw new MalformedPatternException("Already applied quantifier to this Pattern. Current quantifier is: " + this.quantifier); } } + } http://git-wip-us.apache.org/repos/asf/flink/blob/aa3c395b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java index 7abe9bd..9789072 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java @@ -23,12 +23,24 @@ public enum Quantifier { ONE, ZERO_OR_MORE_EAGER(QuantifierProperty.LOOPING, QuantifierProperty.EAGER), ZERO_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING), + ZERO_OR_MORE_EAGER_STRICT(QuantifierProperty.EAGER, QuantifierProperty.STRICT, QuantifierProperty.LOOPING), + ZERO_OR_MORE_COMBINATIONS_STRICT(QuantifierProperty.STRICT, QuantifierProperty.LOOPING), ONE_OR_MORE_EAGER( QuantifierProperty.LOOPING, QuantifierProperty.EAGER, QuantifierProperty.AT_LEAST_ONE), + ONE_OR_MORE_EAGER_STRICT( + QuantifierProperty.STRICT, + QuantifierProperty.LOOPING, + QuantifierProperty.EAGER, + QuantifierProperty.AT_LEAST_ONE), ONE_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING, QuantifierProperty.AT_LEAST_ONE), - TIMES, + ONE_OR_MORE_COMBINATIONS_STRICT( + QuantifierProperty.STRICT, + QuantifierProperty.LOOPING, + QuantifierProperty.AT_LEAST_ONE), + TIMES(QuantifierProperty.TIMES), + TIMES_STRICT(QuantifierProperty.TIMES, QuantifierProperty.STRICT), OPTIONAL; private final EnumSet<QuantifierProperty> properties; @@ -48,7 +60,9 @@ public enum Quantifier { public enum QuantifierProperty { LOOPING, EAGER, - AT_LEAST_ONE + AT_LEAST_ONE, + STRICT, + TIMES } } http://git-wip-us.apache.org/repos/asf/flink/blob/aa3c395b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java index 197767e..da5f413 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java @@ -591,8 +591,9 @@ public class NFAITCase extends TestLogger { inputEvents.add(new StreamRecord<>(startEvent, 1)); inputEvents.add(new StreamRecord<>(middleEvent1, 3)); inputEvents.add(new StreamRecord<>(middleEvent2, 4)); - inputEvents.add(new StreamRecord<>(middleEvent3, 5)); - inputEvents.add(new StreamRecord<>(end1, 6)); + inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5)); + inputEvents.add(new StreamRecord<>(middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(end1, 7)); Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 5726188262756267490L; @@ -642,7 +643,6 @@ public class NFAITCase extends TestLogger { ), resultingPatterns); } - @Test public void testBeginWithZeroOrMore() { List<StreamRecord<Event>> inputEvents = new ArrayList<>(); @@ -1129,7 +1129,7 @@ public class NFAITCase extends TestLogger { public boolean filter(Event value) throws Exception { return value.getName().equals("middle"); } - }).zeroOrMore(false).followedBy("end").where(new SimpleCondition<Event>() { + }).zeroOrMore().consecutive().followedBy("end").where(new SimpleCondition<Event>() { private static final long serialVersionUID = 7056763917392056548L; @Override @@ -1281,7 +1281,6 @@ public class NFAITCase extends TestLogger { ), resultingPatterns); } - @Test public void testTimes() { List<StreamRecord<Event>> inputEvents = new ArrayList<>(); @@ -1659,9 +1658,293 @@ public class NFAITCase extends TestLogger { ), resultingPatterns); } - /** - * Clearing SharedBuffer - */ + + /////////////////////////////// Consecutive //////////////////////////////////////// + + private static class ConsecutiveData { + static final Event startEvent = new Event(40, "c", 1.0); + static final Event middleEvent1 = new Event(41, "a", 2.0); + static final Event middleEvent2 = new Event(42, "a", 3.0); + static final Event middleEvent3 = new Event(43, "a", 4.0); + static final Event middleEvent4 = new Event(43, "a", 5.0); + static final Event end = new Event(44, "b", 5.0); + + private ConsecutiveData() { + } + } + + @Test + public void testStrictCombinationsOneOrMore() { + List<List<Event>> resultingPatterns = testStrictOneOrMore(false); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent4, ConsecutiveData.end) + )); + } + + @Test + public void testStrictEagerOneOrMore() { + List<List<Event>> resultingPatterns = testStrictOneOrMore(true); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end) + )); + } + + private List<List<Event>> testStrictOneOrMore(boolean eager) { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 2)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 5)); + inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent4, 7)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 8)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore(eager).consecutive() + .followedBy("end1").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + return feedNFA(inputEvents, nfa); + } + + @Test + public void testStrictEagerZeroOrMore() { + List<List<Event>> resultingPatterns = testStrictZeroOrMore(true); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end) + )); + } + + @Test + public void testStrictCombinationsZeroOrMore() { + List<List<Event>> resultingPatterns = testStrictZeroOrMore(false); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end) + )); + } + + private List<List<Event>> testStrictZeroOrMore(boolean eager) { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 2)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4)); + inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).zeroOrMore(eager).consecutive().followedBy("end1").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + return feedNFA(inputEvents, nfa); + } + + + @Test + public void testTimesStrict() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(2).consecutive().followedBy("end1").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end) + )); + } + + @Test + public void testTimesNonStrict() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(2).followedBy("end1").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end), + Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end) + )); + } + + @Test + public void testStartWithZeroOrMoreStrict() { + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).zeroOrMore().consecutive(); + + testStartWithOneOrZeroOrMoreStrict(pattern); + } + + @Test + public void testStartWithOneOrMoreStrict() { + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().consecutive(); + + testStartWithOneOrZeroOrMoreStrict(pattern); + } + + private void testStartWithOneOrZeroOrMoreStrict(Pattern<Event, ?> pattern) { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 4)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5)); + inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6)); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(ConsecutiveData.middleEvent1), + Lists.newArrayList(ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3), + Lists.newArrayList(ConsecutiveData.middleEvent2), + Lists.newArrayList(ConsecutiveData.middleEvent3) + )); + } + + /////////////////////////////// Clearing SharedBuffer //////////////////////////////////////// @Test public void testTimesClearingBuffer() { @@ -1934,17 +2217,7 @@ public class NFAITCase extends TestLogger { NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - List<List<Event>> resultingPatterns = new ArrayList<>(); - - for (StreamRecord<Event> inputEvent : inputEvents) { - Collection<Map<String, Event>> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map<String, Event> p: patterns) { - resultingPatterns.add(new ArrayList<>(p.values())); - } - } + List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); return resultingPatterns; } @@ -2019,17 +2292,7 @@ public class NFAITCase extends TestLogger { NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - List<List<Event>> resultingPatterns = new ArrayList<>(); - - for (StreamRecord<Event> inputEvent : inputEvents) { - Collection<Map<String, Event>> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map<String, Event> p: patterns) { - resultingPatterns.add(new ArrayList<>(p.values())); - } - } + List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); return resultingPatterns; } @@ -2068,17 +2331,7 @@ public class NFAITCase extends TestLogger { NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - List<List<Event>> resultingPatterns = new ArrayList<>(); - - for (StreamRecord<Event> inputEvent : inputEvents) { - Collection<Map<String, Event>> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map<String, Event> p: patterns) { - resultingPatterns.add(new ArrayList<>(p.values())); - } - } + List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( @@ -2145,17 +2398,7 @@ public class NFAITCase extends TestLogger { NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - List<List<Event>> resultingPatterns = new ArrayList<>(); - - for (StreamRecord<Event> inputEvent : inputEvents) { - Collection<Map<String, Event>> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map<String, Event> p: patterns) { - resultingPatterns.add(new ArrayList<>(p.values())); - } - } + List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( @@ -2212,17 +2455,7 @@ public class NFAITCase extends TestLogger { NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - List<List<Event>> resultingPatterns = new ArrayList<>(); - - for (StreamRecord<Event> inputEvent : inputEvents) { - Collection<Map<String, Event>> patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map<String, Event> p: patterns) { - resultingPatterns.add(new ArrayList<>(p.values())); - } - } + List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( @@ -2237,6 +2470,21 @@ public class NFAITCase extends TestLogger { ); } + private List<List<Event>> feedNFA(List<StreamRecord<Event>> inputEvents, NFA<Event> nfa) { + List<List<Event>> resultingPatterns = new ArrayList<>(); + + for (StreamRecord<Event> inputEvent : inputEvents) { + Collection<Map<String, Event>> patterns = nfa.process( + inputEvent.getValue(), + inputEvent.getTimestamp()).f0; + + for (Map<String, Event> p: patterns) { + resultingPatterns.add(new ArrayList<>(p.values())); + } + } + return resultingPatterns; + } + private void compareMaps(List<List<Event>> actual, List<List<Event>> expected) { Assert.assertEquals(expected.size(), actual.size());
