Repository: flink Updated Branches: refs/heads/master b13914799 -> 449c84b0e
[FLINK-7170] [cep] Fix until condition when the contiguity is strict This closes #4318. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/449c84b0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/449c84b0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/449c84b0 Branch: refs/heads/master Commit: 449c84b0e21c6cfe43ab9ed697a69fa09308b381 Parents: b139147 Author: Dian Fu <fudian...@alibaba-inc.com> Authored: Thu Jul 13 17:21:30 2017 +0800 Committer: Dawid Wysakowicz <dwysakow...@apache.org> Committed: Tue Jul 25 14:30:59 2017 +0200 ---------------------------------------------------------------------- .../flink/cep/nfa/compiler/NFACompiler.java | 30 ++++++++----- .../flink/cep/nfa/UntilConditionITCase.java | 47 ++++++++++++++++++++ 2 files changed, 67 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/449c84b0/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java index e160e4a..62464d1 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java @@ -630,10 +630,12 @@ public class NFACompiler { final IterativeCondition<T> ignoreCondition = extendWithUntilCondition( getInnerIgnoreCondition(currentPattern), - untilCondition); + untilCondition, + false); final IterativeCondition<T> takeCondition = extendWithUntilCondition( getTakeCondition(currentPattern), - untilCondition); + untilCondition, + true); final IterativeCondition<T> proceedCondition = getTrueFunction(); final State<T> loopingState = createState(currentPattern.getName(), State.StateType.Normal); @@ -664,7 +666,8 @@ public class NFACompiler { private State<T> createInitMandatoryStateOfOneOrMore(final State<T> sinkState) { final IterativeCondition<T> takeCondition = extendWithUntilCondition( getTakeCondition(currentPattern), - (IterativeCondition<T>) currentPattern.getUntilCondition() + (IterativeCondition<T>) currentPattern.getUntilCondition(), + true ); final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern); @@ -683,7 +686,8 @@ public class NFACompiler { private State<T> createInitOptionalStateOfZeroOrMore(final State<T> loopingState, final State<T> lastSink) { final IterativeCondition<T> takeCondition = extendWithUntilCondition( getTakeCondition(currentPattern), - (IterativeCondition<T>) currentPattern.getUntilCondition() + (IterativeCondition<T>) currentPattern.getUntilCondition(), + true ); final IterativeCondition<T> ignoreFunction = getIgnoreCondition(currentPattern); @@ -697,14 +701,16 @@ public class NFACompiler { * * @param condition the condition to extend * @param untilCondition the until condition to join with the given condition + * @param isTakeCondition whether the {@code condition} is for {@code TAKE} edge * @return condition with AND applied or the original condition */ private IterativeCondition<T> extendWithUntilCondition( IterativeCondition<T> condition, - IterativeCondition<T> untilCondition) { + IterativeCondition<T> untilCondition, + boolean isTakeCondition) { if (untilCondition != null && condition != null) { return new AndCondition<>(new NotCondition<>(untilCondition), condition); - } else if (untilCondition != null) { + } else if (untilCondition != null && isTakeCondition) { return new NotCondition<>(untilCondition); } @@ -741,7 +747,8 @@ public class NFACompiler { if (currentGroupPattern != null && currentGroupPattern.getUntilCondition() != null) { innerIgnoreCondition = extendWithUntilCondition( innerIgnoreCondition, - (IterativeCondition<T>) currentGroupPattern.getUntilCondition()); + (IterativeCondition<T>) currentGroupPattern.getUntilCondition(), + false); } return innerIgnoreCondition; } @@ -781,7 +788,8 @@ public class NFACompiler { if (currentGroupPattern != null && currentGroupPattern.getUntilCondition() != null) { ignoreCondition = extendWithUntilCondition( ignoreCondition, - (IterativeCondition<T>) currentGroupPattern.getUntilCondition()); + (IterativeCondition<T>) currentGroupPattern.getUntilCondition(), + false); } return ignoreCondition; } @@ -797,7 +805,8 @@ public class NFACompiler { if (currentGroupPattern != null && currentGroupPattern.getUntilCondition() != null) { takeCondition = extendWithUntilCondition( takeCondition, - (IterativeCondition<T>) currentGroupPattern.getUntilCondition()); + (IterativeCondition<T>) currentGroupPattern.getUntilCondition(), + true); } return takeCondition; } @@ -811,7 +820,8 @@ public class NFACompiler { if (currentGroupPattern != null && currentGroupPattern.getUntilCondition() != null) { trueCondition = extendWithUntilCondition( trueCondition, - (IterativeCondition<T>) currentGroupPattern.getUntilCondition()); + (IterativeCondition<T>) currentGroupPattern.getUntilCondition(), + true); } return trueCondition; } http://git-wip-us.apache.org/repos/asf/flink/blob/449c84b0/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java index d56e883..639541d 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java @@ -194,6 +194,53 @@ public class UntilConditionITCase { } @Test + public void testUntilConditionFollowedByOneOrMoreConsecutive2() throws Exception { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event startEvent = new Event(40, "c", 1.0); + Event middleEvent1 = new Event(41, "a", 2.0); + Event middleEvent2 = new Event(42, "b", 3.0); + Event middleEvent3 = new Event(43, "a", 4.0); + Event breaking = new Event(45, "a", 5.0); + Event ignored = new Event(46, "a", 6.0); + + inputEvents.add(new StreamRecord<>(startEvent, 1)); + inputEvents.add(new StreamRecord<>(middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(middleEvent2, 4)); + inputEvents.add(new StreamRecord<>(middleEvent3, 5)); + inputEvents.add(new StreamRecord<>(breaking, 7)); + inputEvents.add(new StreamRecord<>(ignored, 8)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().consecutive().until(UNTIL_CONDITION) + .followedBy("end").where( + UNTIL_CONDITION + ); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + Lists.newArrayList(startEvent, middleEvent1, breaking) + )); + assertTrue(nfa.isEmpty()); + } + + @Test public void testUntilConditionFollowedByZeroOrMore() throws Exception { List<StreamRecord<Event>> inputEvents = new ArrayList<>();