[FLINK-6609] [cep] Fix wrong version assignment with multiple TAKEs.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/00ce3f1b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/00ce3f1b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/00ce3f1b Branch: refs/heads/master Commit: 00ce3f1b12c7d7bf996d5f91bf006f0e18a719e7 Parents: 7a54d05 Author: Dawid Wysakowicz <da...@getindata.com> Authored: Wed May 17 09:16:08 2017 +0200 Committer: kkloudas <kklou...@gmail.com> Committed: Wed May 17 14:37:35 2017 +0200 ---------------------------------------------------------------------- .../main/java/org/apache/flink/cep/nfa/NFA.java | 11 ++- .../org/apache/flink/cep/nfa/NFAITCase.java | 82 ++++++++++++++++++++ 2 files changed, 89 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/00ce3f1b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index ab5cd8e..a977a7f 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -418,6 +418,7 @@ public class NFA<T> implements Serializable { final List<StateTransition<T>> edges = outgoingEdges.getEdges(); int takeBranchesToVisit = Math.max(0, outgoingEdges.getTotalTakeBranches() - 1); int ignoreBranchesToVisit = outgoingEdges.getTotalIgnoreBranches(); + int totalTakeToSkip = Math.max(0, outgoingEdges.getTotalTakeBranches() - 1); final List<ComputationState<T>> resultingComputationStates = new ArrayList<>(); for (StateTransition<T> edge : edges) { @@ -433,7 +434,9 @@ public class NFA<T> implements Serializable { version = computationState.getVersion().increase(toIncrease); } else { //IGNORE after PROCEED - version = computationState.getVersion().increase(ignoreBranchesToVisit).addStage(); + version = computationState.getVersion() + .increase(totalTakeToSkip + ignoreBranchesToVisit) + .addStage(); ignoreBranchesToVisit--; } @@ -457,8 +460,8 @@ public class NFA<T> implements Serializable { final T previousEvent = computationState.getEvent(); - final DeweyNumber currentVersion = computationState.getVersion(); - final DeweyNumber nextVersion = new DeweyNumber(currentVersion).addStage().increase(takeBranchesToVisit); + final DeweyNumber currentVersion = computationState.getVersion().increase(takeBranchesToVisit); + final DeweyNumber nextVersion = new DeweyNumber(currentVersion).addStage(); takeBranchesToVisit--; final int counter; @@ -573,7 +576,7 @@ public class NFA<T> implements Serializable { } private int calculateIncreasingSelfState(int ignoreBranches, int takeBranches) { - return takeBranches == 0 && ignoreBranches == 0 ? 0 : ignoreBranches + 1; + return takeBranches == 0 && ignoreBranches == 0 ? 0 : ignoreBranches + Math.max(1, takeBranches); } private OutgoingEdges<T> createDecisionGraph(ComputationState<T> computationState, T event) { http://git-wip-us.apache.org/repos/asf/flink/blob/00ce3f1b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java index 012e112..d00bbb7 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java @@ -3974,6 +3974,88 @@ public class NFAITCase extends TestLogger { } @Test + public void testMultipleTakesVersionCollision() { + List<StreamRecord<Event>> inputEvents = new ArrayList<>(); + + Event startEvent = new Event(40, "c", 1.0); + Event middleEvent1 = new Event(41, "a", 2.0); + Event middleEvent2 = new Event(41, "a", 3.0); + Event middleEvent3 = new Event(41, "a", 4.0); + Event middleEvent4 = new Event(41, "a", 5.0); + Event middleEvent5 = new Event(41, "a", 6.0); + Event end = new Event(44, "b", 5.0); + + inputEvents.add(new StreamRecord<>(startEvent, 1)); + inputEvents.add(new StreamRecord<>(middleEvent1, 3)); + inputEvents.add(new StreamRecord<>(middleEvent2, 4)); + inputEvents.add(new StreamRecord<>(middleEvent3, 5)); + inputEvents.add(new StreamRecord<>(middleEvent4, 6)); + inputEvents.add(new StreamRecord<>(middleEvent5, 7)); + inputEvents.add(new StreamRecord<>(end, 10)); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle1").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().allowCombinations().followedBy("middle2").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().allowCombinations().followedBy("end").where(new SimpleCondition<Event>() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + }); + + NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( + + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent4, middleEvent5, end), + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent4, middleEvent5, end), + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent4, middleEvent5, end), + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent4, middleEvent5, end), + + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent4, end), + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent4, middleEvent5, end), + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent4, end), + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent5, end), + Lists.newArrayList(startEvent, middleEvent1, middleEvent3, middleEvent4, middleEvent5, end), + Lists.newArrayList(startEvent, middleEvent1, middleEvent3, middleEvent4, middleEvent5, end), + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent4, end), + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent5, end), + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent4, middleEvent5, end), + + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, end), + Lists.newArrayList(startEvent, middleEvent1, middleEvent3, middleEvent4, end), + Lists.newArrayList(startEvent, middleEvent1, middleEvent4, middleEvent5, end), + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, end), + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent4, end), + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent5, end), + + Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end) + )); + } + + @Test public void testZeroOrMoreSameElement() { List<StreamRecord<Event>> inputEvents = new ArrayList<>();