Dawid Wysakowicz created FLINK-6609:
---------------------------------------
Summary: Wrong version assignment when multiple TAKEs transitions
Key: FLINK-6609
URL: https://issues.apache.org/jira/browse/FLINK-6609
Project: Flink
Issue Type: Bug
Components: CEP
Affects Versions: 1.3.0
Reporter: Dawid Wysakowicz
Assignee: Dawid Wysakowicz
Priority: Blocker
Fix For: 1.3.0
This test fails due to wrong version assignment for TAKEs from the same state.
{code}
@Test
public void testMultipleTakesVersionCollision() {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event startEvent = new Event(40, "c", 1.0);
Event middleEvent1 = new Event(41, "a", 2.0);
Event middleEvent2 = new Event(41, "a", 3.0);
Event middleEvent3 = new Event(41, "a", 4.0);
Event middleEvent4 = new Event(41, "a", 5.0);
Event middleEvent5 = new Event(41, "a", 6.0);
Event end = new Event(44, "b", 5.0);
inputEvents.add(new StreamRecord<>(startEvent, 1));
inputEvents.add(new StreamRecord<>(middleEvent1, 3));
inputEvents.add(new StreamRecord<>(middleEvent2, 4));
inputEvents.add(new StreamRecord<>(middleEvent3, 5));
inputEvents.add(new StreamRecord<>(middleEvent4, 6));
inputEvents.add(new StreamRecord<>(middleEvent5, 7));
inputEvents.add(new StreamRecord<>(end, 10));
Pattern<Event, ?> pattern =
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
private static final long serialVersionUID =
5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("c");
}
}).followedBy("middle1").where(new SimpleCondition<Event>() {
private static final long serialVersionUID =
5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).oneOrMore().allowCombinations().followedBy("middle2").where(new
SimpleCondition<Event>() {
private static final long serialVersionUID =
5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).oneOrMore().allowCombinations().followedBy("end").where(new
SimpleCondition<Event>() {
private static final long serialVersionUID =
5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("b");
}
});
NFA<Event> nfa = NFACompiler.compile(pattern,
Event.createTypeSerializer(), false);
final List<List<Event>> resultingPatterns =
feedNFA(inputEvents, nfa);
compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(startEvent, middleEvent1,
middleEvent2, middleEvent3, middleEvent4, middleEvent5, end),
Lists.newArrayList(startEvent, middleEvent1,
middleEvent2, middleEvent3, middleEvent4, middleEvent5, end),
Lists.newArrayList(startEvent, middleEvent1,
middleEvent2, middleEvent3, middleEvent4, middleEvent5, end),
Lists.newArrayList(startEvent, middleEvent1,
middleEvent2, middleEvent3, middleEvent4, middleEvent5, end),
Lists.newArrayList(startEvent, middleEvent1,
middleEvent2, middleEvent3, middleEvent4, end),
Lists.newArrayList(startEvent, middleEvent1,
middleEvent2, middleEvent4, middleEvent5, end),
Lists.newArrayList(startEvent, middleEvent1,
middleEvent2, middleEvent3, middleEvent4, end),
Lists.newArrayList(startEvent, middleEvent1,
middleEvent2, middleEvent3, middleEvent5, end),
Lists.newArrayList(startEvent, middleEvent1,
middleEvent3, middleEvent4, middleEvent5, end),
Lists.newArrayList(startEvent, middleEvent1,
middleEvent3, middleEvent4, middleEvent5, end),
Lists.newArrayList(startEvent, middleEvent1,
middleEvent2, middleEvent3, middleEvent4, end),
Lists.newArrayList(startEvent, middleEvent1,
middleEvent2, middleEvent3, middleEvent5, end),
Lists.newArrayList(startEvent, middleEvent1,
middleEvent2, middleEvent4, middleEvent5, end),
Lists.newArrayList(startEvent, middleEvent1,
middleEvent2, middleEvent3, end),
Lists.newArrayList(startEvent, middleEvent1,
middleEvent3, middleEvent4, end),
Lists.newArrayList(startEvent, middleEvent1,
middleEvent4, middleEvent5, end),
Lists.newArrayList(startEvent, middleEvent1,
middleEvent2, middleEvent3, end),
Lists.newArrayList(startEvent, middleEvent1,
middleEvent2, middleEvent4, end),
Lists.newArrayList(startEvent, middleEvent1,
middleEvent2, middleEvent5, end),
Lists.newArrayList(startEvent, middleEvent1,
middleEvent2, end)
));
}
{code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)