[
https://issues.apache.org/jira/browse/FLINK-6418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16060680#comment-16060680
]
ASF GitHub Bot commented on FLINK-6418:
---------------------------------------
Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/4143#discussion_r123713960
--- Diff:
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
---
@@ -514,25 +524,46 @@ private void addStopStateToLooping(final State<T>
loopingState) {
*/
@SuppressWarnings("unchecked")
private State<T> createInitOptionalStateOfZeroOrMore(final
State<T> loopingState, final State<T> lastSink) {
- final IterativeCondition<T> currentCondition =
(IterativeCondition<T>) currentPattern.getCondition();
+ final IterativeCondition<T> takeCondition =
extendWithUntilCondition(
+ (IterativeCondition<T>)
currentPattern.getCondition(),
+ (IterativeCondition<T>)
currentPattern.getUntilCondition()
+ );
final State<T> firstState =
createState(currentPattern.getName(), State.StateType.Normal);
firstState.addProceed(lastSink,
BooleanConditions.<T>trueFunction());
- firstState.addTake(loopingState, currentCondition);
+ firstState.addTake(loopingState, takeCondition);
final IterativeCondition<T> ignoreFunction =
getIgnoreCondition(currentPattern);
if (ignoreFunction != null) {
final State<T> firstStateWithoutProceed =
createState(currentPattern.getName(), State.StateType.Normal);
firstState.addIgnore(firstStateWithoutProceed,
ignoreFunction);
firstStateWithoutProceed.addIgnore(ignoreFunction);
- firstStateWithoutProceed.addTake(loopingState,
currentCondition);
+ firstStateWithoutProceed.addTake(loopingState,
takeCondition);
addStopStates(firstStateWithoutProceed);
}
return firstState;
}
/**
+ * This method extends the given condition with stop(until)
condition if necessary.
+ * The until condition needs to be applied only if both of the
given conditions are not null.
+ *
+ * @param condition the condition to extend
+ * @param untilCondition the until condition to join with the
given condition
+ * @return condition with AND applied or the original condition
+ */
+ private IterativeCondition<T> extendWithUntilCondition(
+ IterativeCondition<T> condition,
+ IterativeCondition<T> untilCondition) {
+ if (untilCondition != null && condition != null) {
+ return new AndCondition<>(new
NotCondition<>(untilCondition), condition);
+ } else {
+ return condition;
+ }
+ }
+
--- End diff --
The way this is, now you do not allow patterns with no condition and only
until condition, like:
```
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new
SimpleCondition<Event>() {
private static final long serialVersionUID =
5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("c");
}
}).followedBy("middle").oneOrMore().until(UNTIL_CONDITION);
```
To do this, I think that transforming the method to the following will do
the job:
```
if (untilCondition != null && condition != null) {
return new AndCondition<>(new
NotCondition<>(untilCondition), condition);
}
if (condition != null) {
return condition;
}
if (untilCondition != null) {
return new NotCondition<>(untilCondition);
}
return null;
```
And to do this properly, you could also add a test in the
`UntilConditionITCase` like:
```
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event startEvent = new Event(40, "c", 1.0);
Event middleEvent1 = new Event(41, "a", 2.0);
Event middleEvent2 = new Event(42, "a", 3.0);
Event startEvent2 = new Event(40, "d", 1.0);
Event breaking = new Event(44, "a", 5.0);
Event ignored = new Event(45, "a", 6.0);
inputEvents.add(new StreamRecord<>(startEvent, 1));
inputEvents.add(new StreamRecord<>(middleEvent1, 3));
inputEvents.add(new StreamRecord<>(middleEvent2, 4));
inputEvents.add(new StreamRecord<>(startEvent2, 4));
inputEvents.add(new StreamRecord<>(breaking, 6));
inputEvents.add(new StreamRecord<>(ignored, 7));
Pattern<Event, ?> pattern =
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
private static final long serialVersionUID =
5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("c");
}
}).followedBy("middle").oneOrMore().until(UNTIL_CONDITION);
NFA<Event> nfa = NFACompiler.compile(pattern,
Event.createTypeSerializer(), false);
final List<List<Event>> resultingPatterns =
feedNFA(inputEvents, nfa);
```
And you could also add one for the case that this method returns `null`.
> Support for dynamic state changes in CEP patterns
> -------------------------------------------------
>
> Key: FLINK-6418
> URL: https://issues.apache.org/jira/browse/FLINK-6418
> Project: Flink
> Issue Type: Improvement
> Components: CEP
> Affects Versions: 1.3.0
> Reporter: Elias Levy
> Assignee: Dawid Wysakowicz
>
> Flink CEP library allows one to define event pattern to match where the match
> condition can be determined programmatically via the {{where}} method. Flink
> 1.3 will introduce so-called iterative conditions, which allow the predicate
> to look up events already matched by the pattern and thus be conditional on
> them.
> 1.3 also introduces to the API quantifer methods which allow one to
> declaratively specific how many times a condition must be matched before
> there is a state change.
> Alas, there are use cases where the quantifier must be determined dynamically
> based on the events matched by the pattern so far. Therefore, I propose the
> adding of a new {{Pattern}}: {{until}}.
> Like the new iterative variant of {{where}}, {{until}} would take a predicate
> function and a context that provides access to events already matched. But
> whereas {{where}} determines if an event is accepted by the pattern,
> {{until}} determines whether is pattern should move on to the next state.
> In our particular use case, we have a pattern where an event is matched a
> number of times, but depending on the event type, the number (threshold) for
> the pattern to match is different. We could decompose the pattern into
> multiple similar patterns, but that could be inefficient if we have many such
> patterns. If the functionality of {{until}} were available, we could make do
> with a single pattern.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)