Dawid Wysakowicz created FLINK-6609:
---------------------------------------

             Summary: Wrong version assignment when multiple TAKEs transitions
                 Key: FLINK-6609
                 URL: https://issues.apache.org/jira/browse/FLINK-6609
             Project: Flink
          Issue Type: Bug
          Components: CEP
    Affects Versions: 1.3.0
            Reporter: Dawid Wysakowicz
            Assignee: Dawid Wysakowicz
            Priority: Blocker
             Fix For: 1.3.0


This test fails due to wrong version assignment for TAKEs from the same state.

{code}
@Test
        public void testMultipleTakesVersionCollision() {
                List<StreamRecord<Event>> inputEvents = new ArrayList<>();

                Event startEvent = new Event(40, "c", 1.0);
                Event middleEvent1 = new Event(41, "a", 2.0);
                Event middleEvent2 = new Event(41, "a", 3.0);
                Event middleEvent3 = new Event(41, "a", 4.0);
                Event middleEvent4 = new Event(41, "a", 5.0);
                Event middleEvent5 = new Event(41, "a", 6.0);
                Event end = new Event(44, "b", 5.0);

                inputEvents.add(new StreamRecord<>(startEvent, 1));
                inputEvents.add(new StreamRecord<>(middleEvent1, 3));
                inputEvents.add(new StreamRecord<>(middleEvent2, 4));
                inputEvents.add(new StreamRecord<>(middleEvent3, 5));
                inputEvents.add(new StreamRecord<>(middleEvent4, 6));
                inputEvents.add(new StreamRecord<>(middleEvent5, 7));
                inputEvents.add(new StreamRecord<>(end, 10));

                Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;

                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
                }).followedBy("middle1").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;

                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
                
}).oneOrMore().allowCombinations().followedBy("middle2").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;

                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("a");
                        }
                }).oneOrMore().allowCombinations().followedBy("end").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;

                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("b");
                        }
                });

                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);

                final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);

                compareMaps(resultingPatterns, Lists.newArrayList(

                        Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, middleEvent3, middleEvent4, middleEvent5, end),
                        Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, middleEvent3, middleEvent4, middleEvent5, end),
                        Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, middleEvent3, middleEvent4, middleEvent5, end),
                        Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, middleEvent3, middleEvent4, middleEvent5, end),

                        Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, middleEvent3, middleEvent4, end),
                        Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, middleEvent4, middleEvent5, end),
                        Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, middleEvent3, middleEvent4, end),
                        Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, middleEvent3, middleEvent5, end),
                        Lists.newArrayList(startEvent, middleEvent1, 
middleEvent3, middleEvent4, middleEvent5, end),
                        Lists.newArrayList(startEvent, middleEvent1, 
middleEvent3, middleEvent4, middleEvent5, end),
                        Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, middleEvent3, middleEvent4, end),
                        Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, middleEvent3, middleEvent5, end),
                        Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, middleEvent4, middleEvent5, end),

                        Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, middleEvent3, end),
                        Lists.newArrayList(startEvent, middleEvent1, 
middleEvent3, middleEvent4, end),
                        Lists.newArrayList(startEvent, middleEvent1, 
middleEvent4, middleEvent5, end),
                        Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, middleEvent3, end),
                        Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, middleEvent4, end),
                        Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, middleEvent5, end),

                        Lists.newArrayList(startEvent, middleEvent1, 
middleEvent2, end)
                        ));
        }
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to