[
https://issues.apache.org/jira/browse/FLINK-7008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16064918#comment-16064918
]
ASF GitHub Bot commented on FLINK-7008:
---------------------------------------
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/4195#discussion_r124287999
--- Diff:
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
---
@@ -324,6 +327,83 @@ public boolean filter(Event value) throws Exception {
}
}
+ @Test
+ public void testNFAChange() {
+ Pattern<Event, ?> pattern =
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID =
1858562682635302605L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).notFollowedBy("not").where(new IterativeCondition<Event>() {
+ private static final long serialVersionUID =
-6085237016591726715L;
+
+ @Override
+ public boolean filter(Event value, Context<Event> ctx)
throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedByAny("middle").where(new
IterativeCondition<Event>() {
+ private static final long serialVersionUID =
8061969839441121955L;
+
+ @Override
+ public boolean filter(Event value, Context<Event> ctx)
throws Exception {
+ return value.getName().equals("b");
+ }
+
}).oneOrMore().optional().allowCombinations().followedBy("middle2").where(new
IterativeCondition<Event>() {
+ private static final long serialVersionUID =
8061969839441121955L;
+
+ @Override
+ public boolean filter(Event value, Context<Event> ctx)
throws Exception {
+ return value.getName().equals("d");
+ }
+ }).followedBy("end").where(new IterativeCondition<Event>() {
+ private static final long serialVersionUID =
8061969839441121955L;
+
+ @Override
+ public boolean filter(Event value, Context<Event> ctx)
throws Exception {
+ return value.getName().equals("e");
+ }
+ }).within(Time.milliseconds(10));
+
+ NFACompiler.NFAFactory<Event> nfaFactory =
NFACompiler.compileFactory(pattern, Event.createTypeSerializer(), true);
+ NFA<Event> nfa = nfaFactory.createNFA();
+ nfa.process(new Event(1, "b", 1.0), 1L);
+ assertFalse(nfa.isNFAChanged());
+
+ nfa.nfaChanged = false;
--- End diff --
Could you add comments for each assertion, what situation in NFA it covers?
> Update NFA state only when the NFA changes.
> -------------------------------------------
>
> Key: FLINK-7008
> URL: https://issues.apache.org/jira/browse/FLINK-7008
> Project: Flink
> Issue Type: Improvement
> Components: CEP
> Affects Versions: 1.3.1
> Reporter: Kostas Kloudas
> Assignee: Dian Fu
> Fix For: 1.4.0
>
>
> Currently in the {{AbstractKeyedCEPPatternOperator.updateNFA()}} method we
> update the NFA state every time the NFA is touched. This leads to redundant
> puts/gets to the state when there are no changes to the NFA itself.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)