[
https://issues.apache.org/jira/browse/FLINK-7008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066521#comment-16066521
]
ASF GitHub Bot commented on FLINK-7008:
---------------------------------------
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/4195#discussion_r124550121
--- Diff:
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
---
@@ -297,6 +297,85 @@ public void testKeyedAdvancingTimeWithoutElements()
throws Exception {
}
@Test
+ public void testKeyedCEPOperatorNFAChanged() throws Exception {
--- End diff --
I meant rather checking if the resetting for the flag works correct. We
could do it by veryfying correct number of invocations of `update` method. Also
we should test it with both RocksDB and InMemory state backends.
We can do it with Mockito. Just a suggestion for the code:
@Test
public void testKeyedCEPOperatorNFAChanged() throws Exception {
String rocksDbPath = tempFolder.newFolder().getAbsolutePath();
RocksDBStateBackend rocksDBStateBackend = new
RocksDBStateBackend(new MemoryStateBackend());
rocksDBStateBackend.setDbStoragePath(rocksDbPath);
KeyedCEPPatternOperator<Event, Integer> operator = new
KeyedCEPPatternOperator<>(
Event.createTypeSerializer(),
true,
IntSerializer.INSTANCE,
new SimpleNFAFactory(),
true);
OneInputStreamOperatorTestHarness<Event, Map<String,
List<Event>>> harness = getCepTestHarness(operator);
try {
harness.setStateBackend(rocksDBStateBackend);
harness.open();
final ValueState nfaOperatorState =
Whitebox.<ValueState>getInternalState(operator, "nfaOperatorState");
final ValueState nfaOperatorStateSpy =
Mockito.spy(nfaOperatorState);
Whitebox.setInternalState(operator, "nfaOperatorState",
nfaOperatorStateSpy);
Event startEvent = new Event(42, "c", 1.0);
SubEvent middleEvent = new SubEvent(42, "a", 1.0, 10.0);
Event endEvent = new Event(42, "b", 1.0);
harness.processElement(new StreamRecord<>(startEvent,
1L));
harness.processElement(new StreamRecord<>(new Event(42,
"d", 1.0), 4L));
harness.processElement(new
StreamRecord<Event>(middleEvent, 4L));
harness.processElement(new StreamRecord<>(endEvent,
4L));
Mockito.verify(nfaOperatorStateSpy,
Mockito.times(3)).update(Mockito.any());
// get and verify the output
Queue<Object> result = harness.getOutput();
assertEquals(1, result.size());
verifyPattern(result.poll(), startEvent, middleEvent,
endEvent);
} finally {
harness.close();
}
}
> Update NFA state only when the NFA changes.
> -------------------------------------------
>
> Key: FLINK-7008
> URL: https://issues.apache.org/jira/browse/FLINK-7008
> Project: Flink
> Issue Type: Improvement
> Components: CEP
> Affects Versions: 1.3.1
> Reporter: Kostas Kloudas
> Assignee: Dian Fu
> Fix For: 1.4.0
>
>
> Currently in the {{AbstractKeyedCEPPatternOperator.updateNFA()}} method we
> update the NFA state every time the NFA is touched. This leads to redundant
> puts/gets to the state when there are no changes to the NFA itself.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)