[
https://issues.apache.org/jira/browse/FLINK-6904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16049169#comment-16049169
]
ASF GitHub Bot commented on FLINK-6904:
---------------------------------------
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/4121#discussion_r121943356
--- Diff:
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
---
@@ -1905,6 +2147,275 @@ public boolean filter(Event value) throws Exception
{
));
}
+ @Test
+ public void testTimesRangeNonStrictOptional1() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent,
1));
+ inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+ Pattern<Event, ?> pattern =
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID =
5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID =
5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).times(1, 3).optional().followedBy("end1").where(new
SimpleCondition<Event>() {
+ private static final long serialVersionUID =
5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern,
Event.createTypeSerializer(), false);
+
+ List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(ConsecutiveData.startEvent,
ConsecutiveData.end)
+ ));
+ }
+
+ @Test
+ public void testTimesRangeNonStrictOptional2() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent,
1));
+ inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+ inputEvents.add(new
StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
+ inputEvents.add(new
StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+ inputEvents.add(new
StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+ Pattern<Event, ?> pattern =
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID =
5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID =
5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).times(2,
3).allowCombinations().optional().followedBy("end1").where(new
SimpleCondition<Event>() {
+ private static final long serialVersionUID =
5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern,
Event.createTypeSerializer(), false);
+
+ List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(ConsecutiveData.startEvent,
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2,
ConsecutiveData.middleEvent3, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent,
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2,
ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent,
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3,
ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent,
ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3,
ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent,
ConsecutiveData.end)
+ ));
+ }
+
+ @Test
+ public void testTimesRangeNonStrictOptional3() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent,
1));
+ inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+ inputEvents.add(new
StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
+ inputEvents.add(new
StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+ inputEvents.add(new
StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+ Pattern<Event, ?> pattern =
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID =
5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID =
5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).times(2, 3).optional().followedBy("end1").where(new
SimpleCondition<Event>() {
+ private static final long serialVersionUID =
5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern,
Event.createTypeSerializer(), false);
+
+ List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(ConsecutiveData.startEvent,
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2,
ConsecutiveData.middleEvent3, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent,
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2,
ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent,
ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3,
ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent,
ConsecutiveData.end)
+ ));
+ }
+
+ @Test
+ public void testTimesRangeNonStrictWithNext() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent,
1));
+ inputEvents.add(new
StreamRecord<>(ConsecutiveData.middleEvent1, 2));
+ inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 3));
+ inputEvents.add(new
StreamRecord<>(ConsecutiveData.middleEvent2, 4));
+ inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 5));
+ inputEvents.add(new
StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+ Pattern<Event, ?> pattern =
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID =
5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).next("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID =
5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).times(2, 3).allowCombinations().followedBy("end1").where(new
SimpleCondition<Event>() {
+ private static final long serialVersionUID =
5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern,
Event.createTypeSerializer(), false);
+
+ List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(ConsecutiveData.startEvent,
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2,
ConsecutiveData.middleEvent3, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent,
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2,
ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent,
ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end)
+ ));
+ }
+
+ @Test
+ public void testTimesRangeNotStrictWithFollowedByEager() {
--- End diff --
Maybe change the name to `testTimesRangeNotStrictWithFollowedBy`. I think
eager may be misleading here. What do you think?
> Support for quantifier range to CEP's pattern API
> -------------------------------------------------
>
> Key: FLINK-6904
> URL: https://issues.apache.org/jira/browse/FLINK-6904
> Project: Flink
> Issue Type: Bug
> Components: CEP
> Reporter: Dian Fu
> Assignee: Dian Fu
>
> Currently the quantifier has supported oneOrMore, times(int times), one(),we
> should also support API such as times(int from, int to) to specify a
> quantifier range.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)