[FLINK-6609] [cep] Fix wrong version assignment with multiple TAKEs.

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/00ce3f1b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/00ce3f1b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/00ce3f1b

Branch: refs/heads/master
Commit: 00ce3f1b12c7d7bf996d5f91bf006f0e18a719e7
Parents: 7a54d05
Author: Dawid Wysakowicz <da...@getindata.com>
Authored: Wed May 17 09:16:08 2017 +0200
Committer: kkloudas <kklou...@gmail.com>
Committed: Wed May 17 14:37:35 2017 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/flink/cep/nfa/NFA.java | 11 ++-
 .../org/apache/flink/cep/nfa/NFAITCase.java     | 82 ++++++++++++++++++++
 2 files changed, 89 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/00ce3f1b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index ab5cd8e..a977a7f 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -418,6 +418,7 @@ public class NFA<T> implements Serializable {
                final List<StateTransition<T>> edges = outgoingEdges.getEdges();
                int takeBranchesToVisit = Math.max(0, 
outgoingEdges.getTotalTakeBranches() - 1);
                int ignoreBranchesToVisit = 
outgoingEdges.getTotalIgnoreBranches();
+               int totalTakeToSkip = Math.max(0, 
outgoingEdges.getTotalTakeBranches() - 1);
 
                final List<ComputationState<T>> resultingComputationStates = 
new ArrayList<>();
                for (StateTransition<T> edge : edges) {
@@ -433,7 +434,9 @@ public class NFA<T> implements Serializable {
                                                        version = 
computationState.getVersion().increase(toIncrease);
                                                } else {
                                                        //IGNORE after PROCEED
-                                                       version = 
computationState.getVersion().increase(ignoreBranchesToVisit).addStage();
+                                                       version = 
computationState.getVersion()
+                                                               
.increase(totalTakeToSkip + ignoreBranchesToVisit)
+                                                               .addStage();
                                                        ignoreBranchesToVisit--;
                                                }
 
@@ -457,8 +460,8 @@ public class NFA<T> implements Serializable {
 
                                        final T previousEvent = 
computationState.getEvent();
 
-                                       final DeweyNumber currentVersion = 
computationState.getVersion();
-                                       final DeweyNumber nextVersion = new 
DeweyNumber(currentVersion).addStage().increase(takeBranchesToVisit);
+                                       final DeweyNumber currentVersion = 
computationState.getVersion().increase(takeBranchesToVisit);
+                                       final DeweyNumber nextVersion = new 
DeweyNumber(currentVersion).addStage();
                                        takeBranchesToVisit--;
 
                                        final int counter;
@@ -573,7 +576,7 @@ public class NFA<T> implements Serializable {
        }
 
        private int calculateIncreasingSelfState(int ignoreBranches, int 
takeBranches) {
-               return takeBranches == 0 && ignoreBranches == 0 ? 0 : 
ignoreBranches + 1;
+               return takeBranches == 0 && ignoreBranches == 0 ? 0 : 
ignoreBranches + Math.max(1, takeBranches);
        }
 
        private OutgoingEdges<T> createDecisionGraph(ComputationState<T> 
computationState, T event) {

http://git-wip-us.apache.org/repos/asf/flink/blob/00ce3f1b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 012e112..d00bbb7 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -3974,6 +3974,88 @@ public class NFAITCase extends TestLogger {
        }
 
        @Test
+       public void testMultipleTakesVersionCollision() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event startEvent = new Event(40, "c", 1.0);
+               Event middleEvent1 = new Event(41, "a", 2.0);
+               Event middleEvent2 = new Event(41, "a", 3.0);
+               Event middleEvent3 = new Event(41, "a", 4.0);
+               Event middleEvent4 = new Event(41, "a", 5.0);
+               Event middleEvent5 = new Event(41, "a", 6.0);
+               Event end = new Event(44, "b", 5.0);
+
+               inputEvents.add(new StreamRecord<>(startEvent, 1));
+               inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+               inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+               inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+               inputEvents.add(new StreamRecord<>(middleEvent4, 6));
+               inputEvents.add(new StreamRecord<>(middleEvent5, 7));
+               inputEvents.add(new StreamRecord<>(end, 10));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle1").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               
}).oneOrMore().allowCombinations().followedBy("middle2").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).oneOrMore().allowCombinations().followedBy("end").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("b");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+
+                       Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, middleEvent3, middleEvent4, middleEvent5, end),
+                       Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, middleEvent3, middleEvent4, middleEvent5, end),
+                       Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, middleEvent3, middleEvent4, middleEvent5, end),
+                       Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, middleEvent3, middleEvent4, middleEvent5, end),
+
+                       Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, middleEvent3, middleEvent4, end),
+                       Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, middleEvent4, middleEvent5, end),
+                       Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, middleEvent3, middleEvent4, end),
+                       Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, middleEvent3, middleEvent5, end),
+                       Lists.newArrayList(startEvent, middleEvent1, 
middleEvent3, middleEvent4, middleEvent5, end),
+                       Lists.newArrayList(startEvent, middleEvent1, 
middleEvent3, middleEvent4, middleEvent5, end),
+                       Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, middleEvent3, middleEvent4, end),
+                       Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, middleEvent3, middleEvent5, end),
+                       Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, middleEvent4, middleEvent5, end),
+
+                       Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, middleEvent3, end),
+                       Lists.newArrayList(startEvent, middleEvent1, 
middleEvent3, middleEvent4, end),
+                       Lists.newArrayList(startEvent, middleEvent1, 
middleEvent4, middleEvent5, end),
+                       Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, middleEvent3, end),
+                       Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, middleEvent4, end),
+                       Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, middleEvent5, end),
+
+                       Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, end)
+                       ));
+       }
+
+       @Test
        public void testZeroOrMoreSameElement() {
                List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 

Reply via email to