Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/4143#discussion_r123713960
--- Diff:
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
---
@@ -514,25 +524,46 @@ private void addStopStateToLooping(final State<T>
loopingState) {
*/
@SuppressWarnings("unchecked")
private State<T> createInitOptionalStateOfZeroOrMore(final
State<T> loopingState, final State<T> lastSink) {
- final IterativeCondition<T> currentCondition =
(IterativeCondition<T>) currentPattern.getCondition();
+ final IterativeCondition<T> takeCondition =
extendWithUntilCondition(
+ (IterativeCondition<T>)
currentPattern.getCondition(),
+ (IterativeCondition<T>)
currentPattern.getUntilCondition()
+ );
final State<T> firstState =
createState(currentPattern.getName(), State.StateType.Normal);
firstState.addProceed(lastSink,
BooleanConditions.<T>trueFunction());
- firstState.addTake(loopingState, currentCondition);
+ firstState.addTake(loopingState, takeCondition);
final IterativeCondition<T> ignoreFunction =
getIgnoreCondition(currentPattern);
if (ignoreFunction != null) {
final State<T> firstStateWithoutProceed =
createState(currentPattern.getName(), State.StateType.Normal);
firstState.addIgnore(firstStateWithoutProceed,
ignoreFunction);
firstStateWithoutProceed.addIgnore(ignoreFunction);
- firstStateWithoutProceed.addTake(loopingState,
currentCondition);
+ firstStateWithoutProceed.addTake(loopingState,
takeCondition);
addStopStates(firstStateWithoutProceed);
}
return firstState;
}
/**
+ * This method extends the given condition with stop(until)
condition if necessary.
+ * The until condition needs to be applied only if both of the
given conditions are not null.
+ *
+ * @param condition the condition to extend
+ * @param untilCondition the until condition to join with the
given condition
+ * @return condition with AND applied or the original condition
+ */
+ private IterativeCondition<T> extendWithUntilCondition(
+ IterativeCondition<T> condition,
+ IterativeCondition<T> untilCondition) {
+ if (untilCondition != null && condition != null) {
+ return new AndCondition<>(new
NotCondition<>(untilCondition), condition);
+ } else {
+ return condition;
+ }
+ }
+
--- End diff --
The way this is, now you do not allow patterns with no condition and only
until condition, like:
```
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new
SimpleCondition<Event>() {
private static final long serialVersionUID =
5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("c");
}
}).followedBy("middle").oneOrMore().until(UNTIL_CONDITION);
```
To do this, I think that transforming the method to the following will do
the job:
```
if (untilCondition != null && condition != null) {
return new AndCondition<>(new
NotCondition<>(untilCondition), condition);
}
if (condition != null) {
return condition;
}
if (untilCondition != null) {
return new NotCondition<>(untilCondition);
}
return null;
```
And to do this properly, you could also add a test in the
`UntilConditionITCase` like:
```
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event startEvent = new Event(40, "c", 1.0);
Event middleEvent1 = new Event(41, "a", 2.0);
Event middleEvent2 = new Event(42, "a", 3.0);
Event startEvent2 = new Event(40, "d", 1.0);
Event breaking = new Event(44, "a", 5.0);
Event ignored = new Event(45, "a", 6.0);
inputEvents.add(new StreamRecord<>(startEvent, 1));
inputEvents.add(new StreamRecord<>(middleEvent1, 3));
inputEvents.add(new StreamRecord<>(middleEvent2, 4));
inputEvents.add(new StreamRecord<>(startEvent2, 4));
inputEvents.add(new StreamRecord<>(breaking, 6));
inputEvents.add(new StreamRecord<>(ignored, 7));
Pattern<Event, ?> pattern =
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
private static final long serialVersionUID =
5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("c");
}
}).followedBy("middle").oneOrMore().until(UNTIL_CONDITION);
NFA<Event> nfa = NFACompiler.compile(pattern,
Event.createTypeSerializer(), false);
final List<List<Event>> resultingPatterns =
feedNFA(inputEvents, nfa);
```
And you could also add one for the case that this method returns `null`.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---