[ 
https://issues.apache.org/jira/browse/FLINK-6418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16060680#comment-16060680
 ] 

ASF GitHub Bot commented on FLINK-6418:
---------------------------------------

Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4143#discussion_r123713960
  
    --- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
    @@ -514,25 +524,46 @@ private void addStopStateToLooping(final State<T> 
loopingState) {
                 */
                @SuppressWarnings("unchecked")
                private State<T> createInitOptionalStateOfZeroOrMore(final 
State<T> loopingState, final State<T> lastSink) {
    -                   final IterativeCondition<T> currentCondition = 
(IterativeCondition<T>) currentPattern.getCondition();
    +                   final IterativeCondition<T> takeCondition = 
extendWithUntilCondition(
    +                           (IterativeCondition<T>) 
currentPattern.getCondition(),
    +                           (IterativeCondition<T>) 
currentPattern.getUntilCondition()
    +                   );
     
                        final State<T> firstState = 
createState(currentPattern.getName(), State.StateType.Normal);
                        firstState.addProceed(lastSink, 
BooleanConditions.<T>trueFunction());
    -                   firstState.addTake(loopingState, currentCondition);
    +                   firstState.addTake(loopingState, takeCondition);
     
                        final IterativeCondition<T> ignoreFunction = 
getIgnoreCondition(currentPattern);
                        if (ignoreFunction != null) {
                                final State<T> firstStateWithoutProceed = 
createState(currentPattern.getName(), State.StateType.Normal);
                                firstState.addIgnore(firstStateWithoutProceed, 
ignoreFunction);
                                
firstStateWithoutProceed.addIgnore(ignoreFunction);
    -                           firstStateWithoutProceed.addTake(loopingState, 
currentCondition);
    +                           firstStateWithoutProceed.addTake(loopingState, 
takeCondition);
     
                                addStopStates(firstStateWithoutProceed);
                        }
                        return firstState;
                }
     
                /**
    +            * This method extends the given condition with stop(until) 
condition if necessary.
    +            * The until condition needs to be applied only if both of the 
given conditions are not null.
    +            *
    +            * @param condition the condition to extend
    +            * @param untilCondition the until condition to join with the 
given condition
    +            * @return condition with AND applied or the original condition
    +            */
    +           private IterativeCondition<T> extendWithUntilCondition(
    +                           IterativeCondition<T> condition,
    +                           IterativeCondition<T> untilCondition) {
    +                   if (untilCondition != null && condition != null) {
    +                           return new AndCondition<>(new 
NotCondition<>(untilCondition), condition);
    +                   } else {
    +                           return condition;
    +                   }
    +           }
    +
    --- End diff --
    
    The way this is, now you do not allow patterns with no condition and only 
until condition, like: 
    
    ```
    Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new 
SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
    
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
                }).followedBy("middle").oneOrMore().until(UNTIL_CONDITION);
    ```
    
    To do this, I think that transforming the method to the following will do 
the job:
    
    ```
    if (untilCondition != null && condition != null) {
                                return new AndCondition<>(new 
NotCondition<>(untilCondition), condition);
                        }
                        if (condition != null) {
                                return condition;
                        }
                        if (untilCondition != null) {
                                return new NotCondition<>(untilCondition);
                        }
                        return null;
    ```
    
    And to do this properly, you could also add a test in the 
`UntilConditionITCase` like:
    
    ```
    List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    
                Event startEvent = new Event(40, "c", 1.0);
                Event middleEvent1 = new Event(41, "a", 2.0);
                Event middleEvent2 = new Event(42, "a", 3.0);
                Event startEvent2 = new Event(40, "d", 1.0);
                Event breaking = new Event(44, "a", 5.0);
                Event ignored = new Event(45, "a", 6.0);
    
                inputEvents.add(new StreamRecord<>(startEvent, 1));
                inputEvents.add(new StreamRecord<>(middleEvent1, 3));
                inputEvents.add(new StreamRecord<>(middleEvent2, 4));
                inputEvents.add(new StreamRecord<>(startEvent2, 4));
                inputEvents.add(new StreamRecord<>(breaking, 6));
                inputEvents.add(new StreamRecord<>(ignored, 7));
    
                Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
                        private static final long serialVersionUID = 
5726188262756267490L;
    
                        @Override
                        public boolean filter(Event value) throws Exception {
                                return value.getName().equals("c");
                        }
                }).followedBy("middle").oneOrMore().until(UNTIL_CONDITION);
    
                NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
    
                final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
    ```
    
    And you could also add one for the case that this method returns `null`.


> Support for dynamic state changes in CEP patterns
> -------------------------------------------------
>
>                 Key: FLINK-6418
>                 URL: https://issues.apache.org/jira/browse/FLINK-6418
>             Project: Flink
>          Issue Type: Improvement
>          Components: CEP
>    Affects Versions: 1.3.0
>            Reporter: Elias Levy
>            Assignee: Dawid Wysakowicz
>
> Flink CEP library allows one to define event pattern to match where the match 
> condition can be determined programmatically via the {{where}} method.  Flink 
> 1.3 will introduce so-called iterative conditions, which allow the predicate 
> to look up events already matched by the pattern and thus be conditional on 
> them.
> 1.3 also introduces to the API quantifer methods which allow one to 
> declaratively specific how many times a condition must be matched before 
> there is a state change.
> Alas, there are use cases where the quantifier must be determined dynamically 
> based on the events matched by the pattern so far.  Therefore, I propose the 
> adding of a new {{Pattern}}: {{until}}.
> Like the new iterative variant of {{where}}, {{until}} would take a predicate 
> function and a context that provides access to events already matched.  But 
> whereas {{where}} determines if an event is accepted by the pattern, 
> {{until}} determines whether is pattern should move on to the next state.
> In our particular use case, we have a pattern where an event is matched a 
> number of times, but depending on the event type, the number (threshold) for 
> the pattern to match is different.  We could decompose the pattern into 
> multiple similar patterns, but that could be inefficient if we have many such 
> patterns.  If the functionality of {{until}} were available, we could make do 
> with a single pattern.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to