Kostas Kloudas created FLINK-6578:
-------------------------------------
Summary: SharedBuffer creates self-loops when having elements with
same value/timestamp.
Key: FLINK-6578
URL: https://issues.apache.org/jira/browse/FLINK-6578
Project: Flink
Issue Type: Bug
Components: CEP
Affects Versions: 1.3.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
Fix For: 1.3.0
This is a test that fails with the current implementation due to the fact that
the looping state accepts the two {{middleEvent1}} elements but the shared
buffer cannot distinguish between them and gets trapped in an infinite loop
leading to running out of memory.
{code}
@Test
public void testEagerZeroOrMoreSameElement() {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event startEvent = new Event(40, "c", 1.0);
Event middleEvent1 = new Event(41, "a", 2.0);
Event middleEvent2 = new Event(42, "a", 3.0);
Event middleEvent3 = new Event(43, "a", 4.0);
Event end1 = new Event(44, "b", 5.0);
inputEvents.add(new StreamRecord<>(startEvent, 1));
inputEvents.add(new StreamRecord<>(middleEvent1, 3));
inputEvents.add(new StreamRecord<>(middleEvent1, 3));
inputEvents.add(new StreamRecord<>(middleEvent1, 3));
inputEvents.add(new StreamRecord<>(middleEvent2, 4));
inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5));
inputEvents.add(new StreamRecord<>(middleEvent3, 6));
inputEvents.add(new StreamRecord<>(middleEvent3, 6));
inputEvents.add(new StreamRecord<>(end1, 7));
Pattern<Event, ?> pattern =
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
private static final long serialVersionUID =
5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("c");
}
}).followedBy("middle").where(new SimpleCondition<Event>() {
private static final long serialVersionUID =
5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).oneOrMore().optional().followedBy("end1").where(new
SimpleCondition<Event>() {
private static final long serialVersionUID =
5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("b");
}
});
NFA<Event> nfa = NFACompiler.compile(pattern,
Event.createTypeSerializer(), false);
final List<List<Event>> resultingPatterns =
feedNFA(inputEvents, nfa);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1,
middleEvent1, middleEvent1, middleEvent2, middleEvent3, middleEvent3, end1),
Lists.newArrayList(startEvent, middleEvent1,
middleEvent1, middleEvent1, middleEvent2, middleEvent3, end1),
Lists.newArrayList(startEvent, middleEvent1,
middleEvent1, middleEvent1, middleEvent2, end1),
Lists.newArrayList(startEvent, middleEvent1,
middleEvent1, middleEvent1, end1),
Lists.newArrayList(startEvent, middleEvent1,
middleEvent1, end1),
Lists.newArrayList(startEvent, middleEvent1,
end1),
Lists.newArrayList(startEvent, end1)
));
}
{code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)