dianfu commented on code in PR #19295:
URL: https://github.com/apache/flink/pull/19295#discussion_r895598233
##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java:
##########
@@ -261,15 +261,31 @@ public Collection<Tuple2<Map<String, List<T>>, Long>>
advanceTime(
final NFAState nfaState,
final long timestamp)
throws Exception {
+ return advanceTimeAndHandlePendingState(sharedBufferAccessor,
nfaState, timestamp).f1;
+ }
+
+ public Tuple2<Collection<Map<String, List<T>>>,
Collection<Tuple2<Map<String, List<T>>, Long>>>
+ advanceTimeAndHandlePendingState(
Review Comment:
There is no need to introduce advanceTimeAndHandlePendingState. We could
just update the signature of advanceTime if needed.
##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java:
##########
@@ -304,6 +305,18 @@ private State<T> createMiddleStates(final State<T>
sinkState) {
if (currentPattern.getQuantifier().getConsumingStrategy()
== Quantifier.ConsumingStrategy.NOT_FOLLOW) {
// skip notFollow patterns, they are converted into edge
conditions
+ if (currentPattern.getWindowTime() != null
Review Comment:
should use `windowTime` instead of `currentPattern.getWindowTime()`. This
also means that we need to calculate `windowTime` in advance.
##########
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java:
##########
@@ -630,6 +630,102 @@ public String select(Map<String, List<Event>> pattern) {
assertEquals(expected, resultList);
}
+ @Test
+ public void testNotFollowedByWithIn() throws Exception {
Review Comment:
Could we support the following pattern? If so, it would be great to add an
ITCase.
```
Pattern.begin('A').notFollowedBy('B').followedBy('C').times(0,
2).withIn(Time.milliseconds(3))
```
##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java:
##########
@@ -158,9 +158,10 @@ public static boolean canProduceEmptyMatches(final
Pattern<?, ?> pattern) {
*/
void compileFactory() {
if (currentPattern.getQuantifier().getConsumingStrategy()
- == Quantifier.ConsumingStrategy.NOT_FOLLOW) {
+ == Quantifier.ConsumingStrategy.NOT_FOLLOW
Review Comment:
Currently, the windowTime is the minimum of all window times and so it may
happen that the window time is defined in the other Pattern. What about moving
this check to the end of this method?
##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java:
##########
@@ -304,6 +305,18 @@ private State<T> createMiddleStates(final State<T>
sinkState) {
if (currentPattern.getQuantifier().getConsumingStrategy()
== Quantifier.ConsumingStrategy.NOT_FOLLOW) {
// skip notFollow patterns, they are converted into edge
conditions
+ if (currentPattern.getWindowTime() != null
+ && currentPattern.getWindowTime().toMilliseconds()
> 0
+ && sinkState.isFinal()) {
+ final State<T> notFollow =
+ createState(currentPattern.getName(),
State.StateType.Pending);
+ final IterativeCondition<T> notCondition =
getTakeCondition(currentPattern);
+ final State<T> stopState =
+ createStopState(notCondition,
currentPattern.getName());
+ notFollow.addTake(stopState, notCondition);
Review Comment:
```suggestion
notFollow.addProceed(stopState, notCondition);
```
##########
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java:
##########
@@ -304,6 +305,18 @@ private State<T> createMiddleStates(final State<T>
sinkState) {
if (currentPattern.getQuantifier().getConsumingStrategy()
== Quantifier.ConsumingStrategy.NOT_FOLLOW) {
// skip notFollow patterns, they are converted into edge
conditions
+ if (currentPattern.getWindowTime() != null
+ && currentPattern.getWindowTime().toMilliseconds()
> 0
+ && sinkState.isFinal()) {
Review Comment:
```suggestion
&& lastSink.isFinal()) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]