Repository: flink Updated Branches: refs/heads/master b3ffd919f -> d21d5d632
[FLINK-7147] [cep] Support greedy quantifier in CEP This closes #4296. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d21d5d63 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d21d5d63 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d21d5d63 Branch: refs/heads/master Commit: d21d5d632374f9915140190d4d249379a72c90cf Parents: b3ffd91 Author: Dian Fu <fudian...@alibaba-inc.com> Authored: Tue Jul 11 16:03:42 2017 +0800 Committer: Dawid Wysakowicz <dwysakow...@apache.org> Committed: Thu Aug 24 09:04:02 2017 +0200 ---------------------------------------------------------------------- docs/dev/libs/cep.md | 61 +- .../flink/cep/scala/pattern/Pattern.scala | 12 + .../apache/flink/cep/nfa/StateTransition.java | 4 + .../flink/cep/nfa/compiler/NFACompiler.java | 86 +- .../org/apache/flink/cep/pattern/Pattern.java | 27 + .../apache/flink/cep/pattern/Quantifier.java | 12 +- .../org/apache/flink/cep/nfa/GreedyITCase.java | 907 +++++++++++++++++++ 7 files changed, 1093 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d21d5d63/docs/dev/libs/cep.md ---------------------------------------------------------------------- diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md index 4b13bb3..91125ca 100644 --- a/docs/dev/libs/cep.md +++ b/docs/dev/libs/cep.md @@ -163,8 +163,9 @@ In FlinkCEP, looping patterns can be specified using these methods: `pattern.one more occurrences of a given event (e.g. the `b+` mentioned previously); and `pattern.times(#ofTimes)`, for patterns that expect a specific number of occurrences of a given type of event, e.g. 4 `a`'s; and `pattern.times(#fromTimes, #toTimes)`, for patterns that expect a specific minimum number of occurrences and maximum number of occurrences of a given type of event, -e.g. 2-4 `a`s. All patterns, looping or not, can be made optional using the `pattern.optional()` method. For a pattern -named `start`, the following are valid quantifiers: +e.g. 2-4 `a`s. Looping patterns can be made greedy using the `pattern.greedy()` method and group pattern cannot be made greedy +currently. All patterns, looping or not, can be made optional using the `pattern.optional()` method. +For a pattern named `start`, the following are valid quantifiers: <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> @@ -178,21 +179,35 @@ named `start`, the following are valid quantifiers: // expecting 2, 3 or 4 occurrences start.times(2, 4); + // expecting 2, 3 or 4 occurrences and repeating as many as possible + start.times(2, 4).greedy(); + // expecting 0, 2, 3 or 4 occurrences start.times(2, 4).optional(); + // expecting 0, 2, 3 or 4 occurrences and repeating as many as possible + start.times(2, 4).optional().greedy(); + // expecting 1 or more occurrences start.oneOrMore(); + // expecting 1 or more occurrences and repeating as many as possible + start.oneOrMore().greedy(); + // expecting 0 or more occurrences start.oneOrMore().optional(); + // expecting 0 or more occurrences and repeating as many as possible + start.oneOrMore().optional().greedy(); + // expecting 2 or more occurrences start.timesOrMore(2); - // expecting 0, 2 or more occurrences - start.timesOrMore(2).optional(); + // expecting 2 or more occurrences and repeating as many as possible + start.timesOrMore(2).greedy(); + // expecting 0, 2 or more occurrences and repeating as many as possible + start.timesOrMore(2).optional().greedy(); {% endhighlight %} </div> @@ -207,20 +222,38 @@ named `start`, the following are valid quantifiers: // expecting 2, 3 or 4 occurrences start.times(2, 4); + // expecting 2, 3 or 4 occurrences and repeating as many as possible + start.times(2, 4).greedy(); + // expecting 0, 2, 3 or 4 occurrences start.times(2, 4).optional(); + // expecting 0, 2, 3 or 4 occurrences and repeating as many as possible + start.times(2, 4).optional().greedy(); + // expecting 1 or more occurrences start.oneOrMore() + // expecting 1 or more occurrences and repeating as many as possible + start.oneOrMore().greedy(); + // expecting 0 or more occurrences start.oneOrMore().optional() + // expecting 0 or more occurrences and repeating as many as possible + start.oneOrMore().optional().greedy(); + // expecting 2 or more occurrences start.timesOrMore(2); + // expecting 2 or more occurrences and repeating as many as possible + start.timesOrMore(2).greedy(); + // expecting 0, 2 or more occurrences start.timesOrMore(2).optional(); + + // expecting 0, 2 or more occurrences and repeating as many as possible + start.timesOrMore(2).optional().greedy(); {% endhighlight %} </div> </div> @@ -536,6 +569,16 @@ pattern.oneOrMore().optional(); </td> </tr> <tr> + <td><strong>greedy()</strong></td> + <td> + <p>Specifies that this pattern is greedy, i.e. it will repeat as many as possible. This is only applicable + to quantifiers and it does not support group pattern currently.</p> +{% highlight java %} +pattern.oneOrMore().greedy(); +{% endhighlight %} + </td> + </tr> + <tr> <td><strong>consecutive()</strong><a name="consecutive_java"></a></td> <td> <p>Works in conjunction with <code>oneOrMore()</code> and <code>times()</code> and imposes strict contiguity between the matching @@ -718,6 +761,16 @@ pattern.oneOrMore().optional() </td> </tr> <tr> + <td><strong>greedy()</strong></td> + <td> + <p>Specifies that this pattern is greedy, i.e. it will repeat as many as possible. This is only applicable + to quantifiers and it does not support group pattern currently.</p> +{% highlight scala %} +pattern.oneOrMore().greedy() +{% endhighlight %} + </td> + </tr> + <tr> <td><strong>consecutive()</strong><a name="consecutive_scala"></a></td> <td> <p>Works in conjunction with <code>oneOrMore()</code> and <code>times()</code> and imposes strict contiguity between the matching http://git-wip-us.apache.org/repos/asf/flink/blob/d21d5d63/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala index 5b41b90..dba328c 100644 --- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala +++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala @@ -353,6 +353,18 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) { } /** + * Specifies that this pattern is greedy. + * This means as many events as possible will be matched to this pattern. + * + * @return The same pattern with { @link Quantifier#greedy} set to true. + * @throws MalformedPatternException if the quantifier is not applicable to this pattern. + */ + def greedy: Pattern[T, F] = { + jPattern.greedy() + this + } + + /** * Specifies exact number of times that this pattern should be matched. * * @param times number of times matching event must appear http://git-wip-us.apache.org/repos/asf/flink/blob/d21d5d63/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java index bb61e09..e2fd900 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java @@ -76,6 +76,10 @@ public class StateTransition<T> implements Serializable { return newCondition; } + public void setCondition(IterativeCondition<T> condition) { + this.newCondition = condition; + } + @Override public boolean equals(Object obj) { if (obj instanceof StateTransition) { http://git-wip-us.apache.org/repos/asf/flink/blob/d21d5d63/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java index 4d4baca..593c94f 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java @@ -117,6 +117,7 @@ public class NFACompiler { private Map<GroupPattern<T, ?>, Boolean> firstOfLoopMap = new HashMap<>(); private Pattern<T, ?> currentPattern; private Pattern<T, ?> followingPattern; + private Map<String, State<T>> originalStateMap = new HashMap<>(); NFAFactoryCompiler(final Pattern<T, ?> pattern) { this.currentPattern = pattern; @@ -382,6 +383,21 @@ public class NFACompiler { return copyOfSink; } + private State<T> copy(final State<T> state) { + final State<T> copyOfState = createState( + NFAStateNameHandler.getOriginalNameFromInternal(state.getName()), + state.getStateType()); + for (StateTransition<T> tStateTransition : state.getStateTransitions()) { + copyOfState.addStateTransition( + tStateTransition.getAction(), + tStateTransition.getTargetState().equals(tStateTransition.getSourceState()) + ? copyOfState + : tStateTransition.getTargetState(), + tStateTransition.getCondition()); + } + return copyOfState; + } + private void addStopStates(final State<T> state) { for (Tuple2<IterativeCondition<T>, String> notCondition: getCurrentNotCondition()) { final State<T> stopState = createStopState(notCondition.f0, notCondition.f1); @@ -421,6 +437,15 @@ public class NFACompiler { untilCondition, true); + if (currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY) && + times.getFrom() != times.getTo()) { + if (untilCondition != null) { + State<T> sinkStateCopy = copy(sinkState); + originalStateMap.put(sinkState.getName(), sinkStateCopy); + } + updateWithGreedyCondition(sinkState, takeCondition); + } + for (int i = times.getFrom(); i < times.getTo(); i++) { lastSink = createSingletonState(lastSink, proceedState, takeCondition, innerIgnoreCondition, true); addStopStateToLooping(lastSink); @@ -526,18 +551,32 @@ public class NFACompiler { return createGroupPatternState((GroupPattern) currentPattern, sinkState, proceedState, isOptional); } - final IterativeCondition<T> trueFunction = getTrueFunction(); - final State<T> singletonState = createState(currentPattern.getName(), State.StateType.Normal); // if event is accepted then all notPatterns previous to the optional states are no longer valid final State<T> sink = copyWithoutTransitiveNots(sinkState); singletonState.addTake(sink, takeCondition); + // if no element accepted the previous nots are still valid. + final IterativeCondition<T> proceedCondition = getTrueFunction(); + // for the first state of a group pattern, its PROCEED edge should point to the following state of // that group pattern and the edge will be added at the end of creating the NFA for that group pattern if (isOptional && !headOfGroup(currentPattern)) { - // if no element accepted the previous nots are still valid. - singletonState.addProceed(proceedState, trueFunction); + if (currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)) { + final IterativeCondition<T> untilCondition = + (IterativeCondition<T>) currentPattern.getUntilCondition(); + if (untilCondition != null) { + singletonState.addProceed( + originalStateMap.get(proceedState.getName()), + new AndCondition<>(proceedCondition, untilCondition)); + } + singletonState.addProceed(proceedState, + untilCondition != null + ? new AndCondition<>(proceedCondition, new NotCondition<>(untilCondition)) + : proceedCondition); + } else { + singletonState.addProceed(proceedState, proceedCondition); + } } if (ignoreCondition != null) { @@ -569,11 +608,12 @@ public class NFACompiler { final State<T> sinkState, final State<T> proceedState, final boolean isOptional) { - final IterativeCondition<T> trueFunction = getTrueFunction(); + final IterativeCondition<T> proceedCondition = getTrueFunction(); Pattern<T, ?> oldCurrentPattern = currentPattern; Pattern<T, ?> oldFollowingPattern = followingPattern; GroupPattern<T, ?> oldGroupPattern = currentGroupPattern; + State<T> lastSink = sinkState; currentGroupPattern = groupPattern; currentPattern = groupPattern.getRawPattern(); @@ -582,7 +622,7 @@ public class NFACompiler { if (isOptional) { // for the first state of a group pattern, its PROCEED edge should point to // the following state of that group pattern - lastSink.addProceed(proceedState, trueFunction); + lastSink.addProceed(proceedState, proceedCondition); } currentPattern = oldCurrentPattern; followingPattern = oldFollowingPattern; @@ -600,19 +640,20 @@ public class NFACompiler { private State<T> createLoopingGroupPatternState( final GroupPattern<T, ?> groupPattern, final State<T> sinkState) { - final IterativeCondition<T> trueFunction = getTrueFunction(); + final IterativeCondition<T> proceedCondition = getTrueFunction(); Pattern<T, ?> oldCurrentPattern = currentPattern; Pattern<T, ?> oldFollowingPattern = followingPattern; GroupPattern<T, ?> oldGroupPattern = currentGroupPattern; + final State<T> dummyState = createState(currentPattern.getName(), State.StateType.Normal); State<T> lastSink = dummyState; currentGroupPattern = groupPattern; currentPattern = groupPattern.getRawPattern(); lastSink = createMiddleStates(lastSink); lastSink = convertPattern(lastSink); - lastSink.addProceed(sinkState, trueFunction); - dummyState.addProceed(lastSink, trueFunction); + lastSink.addProceed(sinkState, proceedCondition); + dummyState.addProceed(lastSink, proceedCondition); currentPattern = oldCurrentPattern; followingPattern = oldFollowingPattern; currentGroupPattern = oldGroupPattern; @@ -643,9 +684,23 @@ public class NFACompiler { untilCondition, true); - final IterativeCondition<T> proceedCondition = getTrueFunction(); + IterativeCondition<T> proceedCondition = getTrueFunction(); final State<T> loopingState = createState(currentPattern.getName(), State.StateType.Normal); - loopingState.addProceed(sinkState, proceedCondition); + + if (currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)) { + if (untilCondition != null) { + State<T> sinkStateCopy = copy(sinkState); + loopingState.addProceed(sinkStateCopy, new AndCondition<>(proceedCondition, untilCondition)); + originalStateMap.put(sinkState.getName(), sinkStateCopy); + } + loopingState.addProceed(sinkState, + untilCondition != null + ? new AndCondition<>(proceedCondition, new NotCondition<>(untilCondition)) + : proceedCondition); + updateWithGreedyCondition(sinkState, getTakeCondition(currentPattern)); + } else { + loopingState.addProceed(sinkState, proceedCondition); + } loopingState.addTake(takeCondition); addStopStateToLooping(loopingState); @@ -791,6 +846,15 @@ public class NFACompiler { } return trueCondition; } + + private void updateWithGreedyCondition( + State<T> state, + IterativeCondition<T> takeCondition) { + for (StateTransition<T> stateTransition : state.getStateTransitions()) { + stateTransition.setCondition( + new AndCondition<>(stateTransition.getCondition(), new NotCondition<>(takeCondition))); + } + } } /** http://git-wip-us.apache.org/repos/asf/flink/blob/d21d5d63/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java index adf1397..33574b3 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java @@ -312,6 +312,7 @@ public class Pattern<T, F extends T> { * @throws MalformedPatternException if the quantifier is not applicable to this pattern. */ public Pattern<T, F> optional() { + checkIfPreviousPatternGreedy(); quantifier.optional(); return this; } @@ -338,6 +339,20 @@ public class Pattern<T, F extends T> { } /** + * Specifies that this pattern is greedy. + * This means as many events as possible will be matched to this pattern. + * + * @return The same pattern with {@link Quantifier#greedy} set to true. + * @throws MalformedPatternException if the quantifier is not applicable to this pattern. + */ + public Pattern<T, F> greedy() { + checkIfNoNotPattern(); + checkIfNoGroupPattern(); + this.quantifier.greedy(); + return this; + } + + /** * Specifies exact number of times that this pattern should be matched. * * @param times number of times matching event must appear @@ -509,4 +524,16 @@ public class Pattern<T, F extends T> { "Current quantifier is: " + quantifier); } } + + private void checkIfNoGroupPattern() { + if (this instanceof GroupPattern) { + throw new MalformedPatternException("Option not applicable to group pattern"); + } + } + + private void checkIfPreviousPatternGreedy() { + if (previous != null && previous.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)) { + throw new MalformedPatternException("Optional pattern cannot be preceded by greedy pattern"); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/d21d5d63/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java index 2136706..b55051d 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java @@ -105,6 +105,15 @@ public class Quantifier { properties.add(Quantifier.QuantifierProperty.OPTIONAL); } + public void greedy() { + checkPattern(!(innerConsumingStrategy == ConsumingStrategy.SKIP_TILL_ANY), + "Option not applicable to FollowedByAny pattern"); + checkPattern(!hasProperty(Quantifier.QuantifierProperty.SINGLE), + "Option not applicable to singleton quantifier"); + + properties.add(QuantifierProperty.GREEDY); + } + @Override public boolean equals(Object o) { if (this == o) { @@ -130,7 +139,8 @@ public class Quantifier { SINGLE, LOOPING, TIMES, - OPTIONAL + OPTIONAL, + GREEDY } /** http://git-wip-us.apache.org/repos/asf/flink/blob/d21d5d63/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java new file mode 100644 index 0000000..2c7f23c --- /dev/null +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java @@ -0,0 +1,907 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.cep.Event; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps; +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA; + +/** + * IT tests covering {@link Pattern#greedy()}. + */ +public class GreedyITCase extends TestLogger { + + @Test + public void testGreedyZeroOrMore() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event a2 = new Event(42, "a", 2.0); + Event a3 = new Event(43, "a", 2.0); + Event d = new Event(44, "d", 3.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(a1, 2)); + inputEvents.add(new StreamRecord<>(a2, 3)); + inputEvents.add(new StreamRecord<>(a3, 4)); + inputEvents.add(new StreamRecord<>(d, 5)); + + // c a* d + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(c, a1, a2, a3, d) + )); + } + + @Test + public void testGreedyZeroOrMoreInBetween() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event a2 = new Event(42, "a", 2.0); + Event a3 = new Event(43, "a", 2.0); + Event d = new Event(44, "d", 3.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 2)); + inputEvents.add(new StreamRecord<>(a1, 3)); + inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 4)); + inputEvents.add(new StreamRecord<>(a2, 5)); + inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 6)); + inputEvents.add(new StreamRecord<>(a3, 7)); + inputEvents.add(new StreamRecord<>(d, 8)); + + // c a* d + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(c, a1, a2, a3, d) + )); + } + + @Test + public void testGreedyZeroOrMoreWithDummyEventsAfterQuantifier() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event a2 = new Event(42, "a", 2.0); + Event d = new Event(44, "d", 3.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(a1, 2)); + inputEvents.add(new StreamRecord<>(a2, 3)); + inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 4)); + inputEvents.add(new StreamRecord<>(d, 5)); + + // c a* d + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(c, a1, a2, d) + )); + } + + @Test + public void testGreedyZeroOrMoreWithDummyEventsBeforeQuantifier() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event d = new Event(44, "d", 3.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 2)); + inputEvents.add(new StreamRecord<>(d, 5)); + + // c a* d + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(c, d) + )); + } + + @Test + public void testGreedyUntilZeroOrMoreWithDummyEventsAfterQuantifier() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event a2 = new Event(42, "a", 3.0); + Event a3 = new Event(43, "a", 3.0); + Event d = new Event(45, "d", 3.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(a1, 2)); + inputEvents.add(new StreamRecord<>(a2, 3)); + inputEvents.add(new StreamRecord<>(a3, 4)); + inputEvents.add(new StreamRecord<>(new Event(44, "a", 4.0), 5)); + inputEvents.add(new StreamRecord<>(d, 6)); + + // c a* d + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().optional().greedy().until(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getPrice() > 3.0; + } + }).followedBy("end").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(c, a1, a2, a3, d) + )); + } + + @Test + public void testGreedyUntilWithDummyEventsBeforeQuantifier() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event a2 = new Event(42, "a", 3.0); + Event a3 = new Event(43, "a", 3.0); + Event d = new Event(45, "d", 3.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(new Event(44, "a", 4.0), 2)); + inputEvents.add(new StreamRecord<>(a1, 3)); + inputEvents.add(new StreamRecord<>(a2, 4)); + inputEvents.add(new StreamRecord<>(a3, 5)); + inputEvents.add(new StreamRecord<>(d, 6)); + + // c a* d + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().optional().greedy().until(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getPrice() > 3.0; + } + }).followedBy("end").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(c, d) + )); + } + + @Test + public void testGreedyOneOrMore() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event a2 = new Event(42, "a", 2.0); + Event a3 = new Event(43, "a", 2.0); + Event d = new Event(44, "d", 3.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(a1, 2)); + inputEvents.add(new StreamRecord<>(a2, 3)); + inputEvents.add(new StreamRecord<>(a3, 4)); + inputEvents.add(new StreamRecord<>(d, 5)); + + // c a+ d + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().greedy().followedBy("end").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(c, a1, a2, a3, d) + )); + } + + @Test + public void testGreedyOneOrMoreInBetween() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event a2 = new Event(42, "a", 2.0); + Event a3 = new Event(43, "a", 2.0); + Event d = new Event(44, "d", 3.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 2)); + inputEvents.add(new StreamRecord<>(a1, 3)); + inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 4)); + inputEvents.add(new StreamRecord<>(a2, 5)); + inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 6)); + inputEvents.add(new StreamRecord<>(a3, 7)); + inputEvents.add(new StreamRecord<>(d, 8)); + + // c a+ d + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().greedy().followedBy("end").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(c, a1, a2, a3, d) + )); + } + + @Test + public void testGreedyOneOrMoreWithDummyEventsAfterQuantifier() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event a2 = new Event(42, "a", 2.0); + Event d = new Event(44, "d", 3.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(a1, 2)); + inputEvents.add(new StreamRecord<>(a2, 3)); + inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 4)); + inputEvents.add(new StreamRecord<>(d, 5)); + + // c a+ d + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().greedy().followedBy("end").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(c, a1, a2, d) + )); + } + + @Test + public void testGreedyOneOrMoreWithDummyEventsBeforeQuantifier() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event d = new Event(44, "d", 3.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 2)); + inputEvents.add(new StreamRecord<>(d, 5)); + + // c a+ d + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().greedy().followedBy("end").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList()); + } + + @Test + public void testGreedyUntilOneOrMoreWithDummyEventsAfterQuantifier() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event a2 = new Event(42, "a", 3.0); + Event a3 = new Event(43, "a", 3.0); + Event d = new Event(45, "d", 3.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(a1, 2)); + inputEvents.add(new StreamRecord<>(a2, 3)); + inputEvents.add(new StreamRecord<>(a3, 4)); + inputEvents.add(new StreamRecord<>(new Event(44, "a", 4.0), 5)); + inputEvents.add(new StreamRecord<>(d, 6)); + + // c a+ d + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().greedy().until(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getPrice() > 3.0; + } + }).followedBy("end").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(c, a1, a2, a3, d) + )); + } + + @Test + public void testGreedyUntilOneOrMoreWithDummyEventsBeforeQuantifier() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event a2 = new Event(42, "a", 3.0); + Event a3 = new Event(43, "a", 3.0); + Event d = new Event(45, "d", 3.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(new Event(44, "a", 4.0), 2)); + inputEvents.add(new StreamRecord<>(a1, 3)); + inputEvents.add(new StreamRecord<>(a2, 4)); + inputEvents.add(new StreamRecord<>(a3, 5)); + inputEvents.add(new StreamRecord<>(d, 6)); + + // c a+ d + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().greedy().until(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getPrice() > 3.0; + } + }).followedBy("end").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList()); + } + + @Test + public void testGreedyZeroOrMoreBeforeGroupPattern() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(40, "a", 1.0); + Event a2 = new Event(40, "a", 1.0); + Event a3 = new Event(40, "a", 1.0); + Event d1 = new Event(40, "d", 1.0); + Event e1 = new Event(40, "e", 1.0); + Event d2 = new Event(40, "d", 1.0); + Event e2 = new Event(40, "e", 1.0); + Event f = new Event(44, "f", 3.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(a1, 2)); + inputEvents.add(new StreamRecord<>(a2, 3)); + inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 4)); + inputEvents.add(new StreamRecord<>(a3, 5)); + inputEvents.add(new StreamRecord<>(d1, 6)); + inputEvents.add(new StreamRecord<>(e1, 7)); + inputEvents.add(new StreamRecord<>(d2, 8)); + inputEvents.add(new StreamRecord<>(e2, 9)); + inputEvents.add(new StreamRecord<>(f, 10)); + + // c a* (d e){2} f + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().optional().greedy().followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }).followedBy("middle2").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("e"); + } + })).times(2).followedBy("end").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("f"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(c, a1, a2, a3, d1, e1, d2, e2, f) + )); + } + + @Test + public void testEndWithZeroOrMoreGreedy() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event a2 = new Event(42, "a", 2.0); + Event a3 = new Event(43, "a", 2.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(a1, 2)); + inputEvents.add(new StreamRecord<>(a2, 3)); + inputEvents.add(new StreamRecord<>(new Event(44, "dummy", 2.0), 4)); + inputEvents.add(new StreamRecord<>(a3, 5)); + + // c a* + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("end").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().optional().greedy(); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(c), + Lists.newArrayList(c, a1), + Lists.newArrayList(c, a1, a2), + Lists.newArrayList(c, a1, a2, a3) + )); + } + + @Test + public void testEndWithZeroOrMoreConsecutiveGreedy() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event a2 = new Event(42, "a", 2.0); + Event a3 = new Event(43, "a", 2.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(a1, 2)); + inputEvents.add(new StreamRecord<>(a2, 3)); + inputEvents.add(new StreamRecord<>(new Event(44, "dummy", 2.0), 4)); + inputEvents.add(new StreamRecord<>(a3, 5)); + + // c a* + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("end").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().optional().consecutive().greedy(); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(c), + Lists.newArrayList(c, a1), + Lists.newArrayList(c, a1, a2) + )); + } + + @Test + public void testEndWithGreedyTimesRange() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event a2 = new Event(42, "a", 2.0); + Event a3 = new Event(43, "a", 2.0); + Event a4 = new Event(44, "a", 2.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(a1, 2)); + inputEvents.add(new StreamRecord<>(a2, 3)); + inputEvents.add(new StreamRecord<>(a3, 4)); + inputEvents.add(new StreamRecord<>(a4, 5)); + inputEvents.add(new StreamRecord<>(new Event(44, "dummy", 2.0), 6)); + + // c a{2, 5} + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("end").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(2, 5).greedy(); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(c, a1, a2), + Lists.newArrayList(c, a1, a2, a3), + Lists.newArrayList(c, a1, a2, a3, a4) + )); + } + + @Test + public void testGreedyTimesRange() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event a2 = new Event(42, "a", 2.0); + Event a3 = new Event(43, "a", 2.0); + Event a4 = new Event(44, "a", 2.0); + Event d = new Event(45, "d", 2.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(a1, 2)); + inputEvents.add(new StreamRecord<>(a2, 3)); + inputEvents.add(new StreamRecord<>(a3, 4)); + inputEvents.add(new StreamRecord<>(a4, 5)); + inputEvents.add(new StreamRecord<>(d, 6)); + + // c a{2, 5} d + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(2, 5).greedy().followedBy("end").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(c, a1, a2, a3, a4, d) + )); + } +}